Skip to content

Fix: Aws Cloudtrail delay (1193)#1723

Open
vg-svitla wants to merge 2 commits intodevelopfrom
fix/aws_cloudtrail_connector
Open

Fix: Aws Cloudtrail delay (1193)#1723
vg-svitla wants to merge 2 commits intodevelopfrom
fix/aws_cloudtrail_connector

Conversation

@vg-svitla
Copy link
Collaborator

@vg-svitla vg-svitla commented Dec 17, 2025

Relates to 1193

Summary by Sourcery

Adjust S3 log forwarding to be rate-aware by tracking and returning the number of forwarded CloudTrail records per run and sleeping only when no records are processed.

Bug Fixes:

  • Prevent unnecessary polling delays for AWS CloudTrail logs by skipping the frequency sleep when records were forwarded in the current cycle.

Enhancements:

  • Optimize event chunking by using a simple counter to decide when to emit chunks instead of relying on len() calls.
  • Extend the S3 logs worker to return the total number of processed records, enabling more controlled scheduling behavior.

Tests:

  • Update CloudTrail logs worker tests to assert the total number of forwarded events and align expectations with the new return value.

Chores:

  • Clean up and reorder imports in various AWS modules and tests, and remove unused asset connector imports from the main entrypoint.

@vg-svitla vg-svitla requested review from a team and squioc December 17, 2025 16:11
@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Dec 17, 2025

Reviewer's Guide

Optimize S3 CloudTrail event forwarding by avoiding unnecessary sleep when records are available, add a lightweight chunking counter, and adjust tests/imports accordingly.

Sequence diagram for updated AwsS3QueuedConnector run and forward_events flow

sequenceDiagram
    participant Runner as RunLoop
    participant Connector as AbstractAwsS3QueuedConnector
    participant S3 as AwsS3Bucket
    participant Dest as DestinationPlatform

    Runner->>Connector: run()
    loop until alive_is_set
        Connector->>Connector: commit_marker()
        Connector->>Connector: forward_events() int
        activate Connector
        Connector->>S3: _fetch_next_objects(marker)
        S3-->>Connector: objects
        Connector->>S3: _fetch_events(bucket_name, _move_marker(objects))
        S3-->>Connector: events
        Connector->>Connector: _chunk_events(events, chunk_size)
        loop for each chunk
            Connector->>Dest: send_records(records, event_name, batch_size)
            Dest-->>Connector: ack
        end
        Connector-->>Runner: total_records
        deactivate Connector
        alt total_records == 0
            Connector->>Connector: sleep(configuration.frequency)
        else total_records > 0
            Note right of Connector: No sleep to minimize CloudTrail delay
        end
    end
Loading

Updated class diagram for AbstractAwsS3QueuedConnector event handling

classDiagram
    class AbstractAwsS3QueuedConnector {
        +configuration: AwsS3FetcherConfiguration
        +bucket_name: str
        +marker: str
        +alive: Event
        +trigger: Trigger
        +log(message: str, level: str) void
        +log_exception(ex: Exception, message: str) void
        +send_records(records: list, event_name: str, batch_size: int) void
        +commit_marker() void
        +_fetch_next_objects(marker: str) list
        +_move_marker(objects: list) str
        +_fetch_events(bucket_name: str, marker: str) Sequence
        +_chunk_events(events: Sequence, chunk_size: int) Generator~list~
        +forward_events() int
        +stop() void
        +run() void
    }

    class AwsS3FetcherConfiguration {
        +frequency: int
        +chunk_size: int
    }

    AbstractAwsS3QueuedConnector --> AwsS3FetcherConfiguration : uses

    class _chunk_events_changes {
        +counter: int
        +for_each_event(events)
        +if counter >= chunk_size then yield chunk
        +reset counter on new chunk
    }

    AbstractAwsS3QueuedConnector --> _chunk_events_changes : implements_chunking_logic
Loading

File-Level Changes

Change Details Files
Optimize chunking of events to avoid repeated len() calls on the growing chunk list.
  • Introduce a simple integer counter to track chunk size instead of calling len() on the chunk list each iteration.
  • Reset both the chunk list and counter when emitting a full chunk.
  • Maintain existing behavior for yielding a final partially filled chunk.
AWS/connectors/s3/logs/base.py
Make forward_events report how many records were forwarded and use that to control sleep behavior in the worker loop.
  • Change forward_events to return the total number of forwarded records instead of None.
  • Accumulate per-chunk record counts and log using the precomputed count.
  • In run(), call forward_events(), then only sleep for configuration.frequency when no records were forwarded in that iteration.
  • Keep global exception handling around forward_events and re-raise unexpected exceptions.
AWS/connectors/s3/logs/base.py
Update tests to validate the new forward_events return value and clean up imports/ordering in a few modules.
  • In CloudTrail logs tests, capture the total_events returned from forward_events and assert it equals the number of records actually sent.
  • Refactor test expectations to compute the expected record list once and reuse it for both total count and content assertions.
  • Reorder or group imports consistently in several test and helper modules, including account validator, users assets tests, and s3_wrapper.
  • Remove unused imports from AWS/main.py related to asset connectors and account validator.
AWS/tests/connectors/s3/logs/test_cloudtrail_logs_trigger.py
AWS/tests/asset_connector/test_account_validator.py
AWS/tests/asset_connector/test_users_assets.py
AWS/aws_helpers/s3_wrapper.py
AWS/main.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • The change to _chunk_events adds a counter to avoid calling len(chunk), but len on a list is already O(1) in CPython and still used in the final if condition; consider either reverting to len(chunk) entirely or consistently using the counter and updating the comment to avoid implying a performance issue that isn’t there.
  • With the new run loop behavior, when records are continuously available there is no sleep at all, which may result in a tight loop consuming more CPU than before; consider keeping a minimal sleep or a backoff mechanism even when records are forwarded.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The change to `_chunk_events` adds a `counter` to avoid calling `len(chunk)`, but `len` on a list is already O(1) in CPython and still used in the final `if` condition; consider either reverting to `len(chunk)` entirely or consistently using the counter and updating the comment to avoid implying a performance issue that isn’t there.
- With the new `run` loop behavior, when records are continuously available there is no sleep at all, which may result in a tight loop consuming more CPU than before; consider keeping a minimal sleep or a backoff mechanism even when records are forwarded.

## Individual Comments

### Comment 1
<location> `AWS/connectors/s3/logs/base.py:214-218` </location>
<code_context>
         # get and forward events
         try:
             events = self._fetch_events(self.bucket_name, self._move_marker(objects))
             chunks = self._chunk_events(list(events), self.configuration.chunk_size or 10000)
             for records in chunks:
-                self.log(message=f"forwarding {len(records)} records", level="info")
</code_context>

<issue_to_address>
**suggestion (performance):** Avoid materializing all events into a list before chunking to reduce memory pressure.

Wrapping `self._fetch_events(...)` in `list(...)` removes the benefits of streaming and can cause high memory usage if many events are returned. Since `_chunk_events` only iterates over the sequence, you can pass the iterator directly, e.g. `chunks = self._chunk_events(events, ...)`, to preserve streaming behavior and reduce memory pressure.

```suggestion
        # get and forward events
        try:
            events = self._fetch_events(self.bucket_name, self._move_marker(objects))
            chunks = self._chunk_events(events, self.configuration.chunk_size or 10000)
            for records in chunks:
```
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Copy link
Collaborator

@squioc squioc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello,

Thank you about the work.
However, I'm doubt about the impact of these changes. The code belong to deprecated triggers

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, I doubt about the changes.

The triggers (CloudTrailLogsTrigger and FlowlogRecordsTrigger), related to this code, are deprecated.

module.register(CloudTrailLogsTrigger, "cloudtrail_logs_trigger")
module.register(FlowlogRecordsTrigger, "flowlog_records_trigger")

"docker_parameters": "cloudtrail_logs_trigger",
"name": "Fetch CloudTrail logs (deprecated)",

"docker_parameters": "flowlog_records_trigger",
"name": "Fetch Flowlog records (deprecated)",

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants