Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AWS/asset_connector/device_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from sekoia_automation.asset_connector.models.ocsf.group import Group
from sekoia_automation.asset_connector.models.ocsf.organization import Organization
from sekoia_automation.storage import PersistentJSON

from aws_helpers.base import AWSModule


Expand Down
1 change: 1 addition & 0 deletions AWS/asset_connector/users_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
UserTypeStr,
)
from sekoia_automation.storage import PersistentJSON

from aws_helpers.base import AWSModule


Expand Down
1 change: 1 addition & 0 deletions AWS/aws_helpers/account_validator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import boto3
from sekoia_automation.account_validator import AccountValidator

from aws_helpers.base import AWSModule


Expand Down
2 changes: 1 addition & 1 deletion AWS/aws_helpers/s3_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pydantic.v1 import Field
from sekoia_automation.aio.helpers.aws.client import AwsClient, AwsConfiguration

from aws_helpers.utils import is_gzip_compressed, async_gzip_open, AsyncReader
from aws_helpers.utils import AsyncReader, async_gzip_open, is_gzip_compressed


class S3Configuration(AwsConfiguration):
Expand Down
1 change: 0 additions & 1 deletion AWS/aws_helpers/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import io
import gzip
from abc import abstractmethod
from concurrent.futures import Executor
Expand Down
2 changes: 1 addition & 1 deletion AWS/connectors/s3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from aws_helpers.s3_wrapper import S3Configuration, S3Wrapper
from aws_helpers.sqs_wrapper import SqsConfiguration, SqsWrapper
from aws_helpers.utils import normalize_s3_key, AsyncReader
from aws_helpers.utils import AsyncReader, normalize_s3_key
from connectors import AbstractAwsConnector, AbstractAwsConnectorConfiguration
from connectors.metrics import INCOMING_EVENTS

Expand Down
25 changes: 18 additions & 7 deletions AWS/connectors/s3/logs/base.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, I doubt about the changes.

The triggers (CloudTrailLogsTrigger and FlowlogRecordsTrigger), related to this code, are deprecated.

module.register(CloudTrailLogsTrigger, "cloudtrail_logs_trigger")
module.register(FlowlogRecordsTrigger, "flowlog_records_trigger")

"docker_parameters": "cloudtrail_logs_trigger",
"name": "Fetch CloudTrail logs (deprecated)",

"docker_parameters": "flowlog_records_trigger",
"name": "Fetch Flowlog records (deprecated)",

Original file line number Diff line number Diff line change
Expand Up @@ -185,39 +185,49 @@ def _chunk_events(events: Sequence[Any], chunk_size: int) -> Generator[list[Any]
"""
chunk: list[Any] = []

# in order to reduce pressure for len(chunk) add simple counter
counter = 0

# iter over the events
for event in events:
# if the chnuk is full
if len(chunk) >= chunk_size:
# if the chunk is full
if counter >= chunk_size:
# yield the current chunk and create a new one
yield chunk
chunk = []
counter = 0

# add the event to the current chunk
chunk.append(event)
counter += 1

# if the last chunk is not empty
if len(chunk) > 0:
# yield the last chunk
yield chunk

def forward_events(self) -> None:
def forward_events(self) -> int:
# get next objects
objects = self._fetch_next_objects(self.marker)
total_records = 0

# get and forward events
try:
events = self._fetch_events(self.bucket_name, self._move_marker(objects))
chunks = self._chunk_events(list(events), self.configuration.chunk_size or 10000)
for records in chunks:
self.log(message=f"forwarding {len(records)} records", level="info")
records_count = len(records)
total_records += records_count
self.log(message=f"forwarding {records_count} records", level="info")
self.send_records(
records=list(records),
event_name=f"{self.trigger.name.lower().replace(' ', '-')}_{str(time.time())}",
)
except Exception as ex:
self.log_exception(ex, message=f"Failed to forward events from {self.bucket_name}")

return total_records

def stop(self) -> None:
self.alive.set()

Expand All @@ -238,13 +248,14 @@ def run(self) -> None:
while not self.alive.is_set():
try:
self.commit_marker()
self.forward_events()
forwarded_records = self.forward_events()
if forwarded_records == 0:
# Sleep only if there is no records to forward from current fetch
time.sleep(self.configuration.frequency)
except Exception as ex:
self.log_exception(ex, message="An unknown exception occurred")
raise

time.sleep(self.configuration.frequency)


class AwsS3FetcherConfiguration(BaseModel):
frequency: int = 60
Expand Down
6 changes: 3 additions & 3 deletions AWS/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

from sekoia_automation.loguru.config import init_logging

from asset_connector.device_assets import AwsDeviceAssetConnector
from asset_connector.users_assets import AwsUsersAssetConnector
from aws_helpers.account_validator import AwsAccountValidator
from connectors import AwsModule
from connectors.s3.logs.trigger_cloudtrail_logs import CloudTrailLogsTrigger
from connectors.s3.logs.trigger_flowlog_records import FlowlogRecordsTrigger
Expand All @@ -12,9 +15,6 @@
from connectors.s3.trigger_s3_ocsf_parquet import AwsS3OcsfTrigger
from connectors.s3.trigger_s3_records import AwsS3RecordsTrigger
from connectors.trigger_sqs_messages import AwsSqsMessagesTrigger
from asset_connector.device_assets import AwsDeviceAssetConnector
from asset_connector.users_assets import AwsUsersAssetConnector
from aws_helpers.account_validator import AwsAccountValidator

if __name__ == "__main__":
init_logging()
Expand Down
6 changes: 4 additions & 2 deletions AWS/tests/asset_connector/test_account_validator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import pytest
from unittest.mock import Mock, patch

import pytest

from aws_helpers.account_validator import AwsAccountValidator
from aws_helpers.base import AWSModule, AWSConfiguration
from aws_helpers.base import AWSConfiguration, AWSModule


class TestAwsAccountValidator:
Expand Down
17 changes: 9 additions & 8 deletions AWS/tests/asset_connector/test_device_assets.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
import pytest
from unittest import mock
from datetime import datetime
from unittest import mock

import pytest
import pytz
from botocore.exceptions import ClientError, NoCredentialsError
from dateutil.parser import isoparse
from sekoia_automation.asset_connector.models.ocsf.device import (
Device,
DeviceOCSFModel,
OperatingSystem,
OSTypeId,
OSTypeStr,
DeviceTypeStr,
DeviceTypeId,
DeviceTypeStr,
NetworkInterface,
NetworkInterfaceTypeId,
NetworkInterfaceTypeStr,
OperatingSystem,
OSTypeId,
OSTypeStr,
)
from sekoia_automation.asset_connector.models.ocsf.group import Group
from sekoia_automation.asset_connector.models.ocsf.organization import Organization
from sekoia_automation.module import Module
from dateutil.parser import isoparse

from asset_connector.device_assets import AwsDevice, AwsDeviceAssetConnector
from connectors import AwsModule, AwsModuleConfiguration
from asset_connector.device_assets import AwsDeviceAssetConnector, AwsDevice


@pytest.fixture
Expand Down
12 changes: 7 additions & 5 deletions AWS/tests/asset_connector/test_users_assets.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import pytest
from unittest import mock

import pytest
from botocore.exceptions import BotoCoreError, ClientError, NoCredentialsError
from dateutil.parser import isoparse
from sekoia_automation.asset_connector.models.ocsf.user import UserOCSFModel
from sekoia_automation.module import Module
from dateutil.parser import isoparse
from botocore.exceptions import NoCredentialsError, ClientError, BotoCoreError

from asset_connector.users_assets import AwsUser, AwsUsersAssetConnector
from connectors import AwsModule, AwsModuleConfiguration
from asset_connector.users_assets import AwsUsersAssetConnector, AwsUser


@pytest.fixture
Expand Down Expand Up @@ -199,9 +200,10 @@ def test_update_checkpoint_integration(test_aws_users_asset_connector):
# Test AwsUser class
def test_aws_user_initialization():
"""Test AwsUser class initialization."""
from sekoia_automation.asset_connector.models.ocsf.user import User, Account, AccountTypeStr, AccountTypeId
from datetime import datetime

from sekoia_automation.asset_connector.models.ocsf.user import Account, AccountTypeId, AccountTypeStr, User

# Create a test user
account = Account(
name="testuser",
Expand Down
7 changes: 5 additions & 2 deletions AWS/tests/connectors/s3/logs/test_cloudtrail_logs_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,19 +322,22 @@ def test_forward_events(
aws_mock:
"""
with mocked_client.handler_for("s3", S3Mock):
worker.forward_events()
total_events = worker.forward_events()
calls = {
call.kwargs["event_name"]: call.kwargs
for call in trigger.send_event.call_args_list
if call.kwargs.get("event_name")
}
expected_result = [record for obj in S3Objects.values() for record in orjson.loads(obj)["Records"]]

assert total_events == len(expected_result)
assert len(calls) == 1
assert all([name.startswith("aws-cloudtrail_") for name in calls.keys()])
assert [
record
for call in calls.values()
for record in read_file(symphony_storage, call["directory"], call["event"]["records_path"])
] == [record for obj in S3Objects.values() for record in orjson.loads(obj)["Records"]]
] == expected_result


def test_commit_marker(prefix: str, worker: CloudTrailLogsWorker, symphony_storage: Path, aws_mock):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from typing import BinaryIO
from unittest.mock import AsyncMock, MagicMock

import io
import orjson
import pytest
from faker import Faker
Expand Down