Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AWS/asset_connector/device_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from sekoia_automation.asset_connector.models.ocsf.group import Group
from sekoia_automation.asset_connector.models.ocsf.organization import Organization
from sekoia_automation.storage import PersistentJSON

from aws_helpers.base import AWSModule


Expand Down
1 change: 1 addition & 0 deletions AWS/asset_connector/users_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
UserTypeStr,
)
from sekoia_automation.storage import PersistentJSON

from aws_helpers.base import AWSModule


Expand Down
1 change: 1 addition & 0 deletions AWS/aws_helpers/account_validator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import boto3
from sekoia_automation.account_validator import AccountValidator

from aws_helpers.base import AWSModule


Expand Down
2 changes: 1 addition & 1 deletion AWS/aws_helpers/s3_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pydantic.v1 import Field
from sekoia_automation.aio.helpers.aws.client import AwsClient, AwsConfiguration

from aws_helpers.utils import is_gzip_compressed, async_gzip_open, AsyncReader
from aws_helpers.utils import AsyncReader, async_gzip_open, is_gzip_compressed


class S3Configuration(AwsConfiguration):
Expand Down
1 change: 0 additions & 1 deletion AWS/aws_helpers/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import io
import gzip
from abc import abstractmethod
from concurrent.futures import Executor
Expand Down
2 changes: 1 addition & 1 deletion AWS/connectors/s3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from aws_helpers.s3_wrapper import S3Configuration, S3Wrapper
from aws_helpers.sqs_wrapper import SqsConfiguration, SqsWrapper
from aws_helpers.utils import normalize_s3_key, AsyncReader
from aws_helpers.utils import AsyncReader, normalize_s3_key
from connectors import AbstractAwsConnector, AbstractAwsConnectorConfiguration
from connectors.metrics import INCOMING_EVENTS

Expand Down
27 changes: 19 additions & 8 deletions AWS/connectors/s3/logs/base.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, I doubt about the changes.

The triggers (CloudTrailLogsTrigger and FlowlogRecordsTrigger), related to this code, are deprecated.

module.register(CloudTrailLogsTrigger, "cloudtrail_logs_trigger")
module.register(FlowlogRecordsTrigger, "flowlog_records_trigger")

"docker_parameters": "cloudtrail_logs_trigger",
"name": "Fetch CloudTrail logs (deprecated)",

"docker_parameters": "flowlog_records_trigger",
"name": "Fetch Flowlog records (deprecated)",

Original file line number Diff line number Diff line change
Expand Up @@ -185,39 +185,49 @@ def _chunk_events(events: Sequence[Any], chunk_size: int) -> Generator[list[Any]
"""
chunk: list[Any] = []

# in order to reduce pressure for len(chunk) add simple counter
counter = 0

# iter over the events
for event in events:
# if the chnuk is full
if len(chunk) >= chunk_size:
# if the chunk is full
if counter >= chunk_size:
# yield the current chunk and create a new one
yield chunk
chunk = []
counter = 0

# add the event to the current chunk
chunk.append(event)
counter += 1

# if the last chunk is not empty
if len(chunk) > 0:
# yield the last chunk
yield chunk

def forward_events(self) -> None:
def forward_events(self) -> int:
# get next objects
objects = self._fetch_next_objects(self.marker)
total_records = 0

# get and forward events
try:
events = self._fetch_events(self.bucket_name, self._move_marker(objects))
chunks = self._chunk_events(list(events), self.configuration.chunk_size or 10000)
for records in chunks:
self.log(message=f"forwarding {len(records)} records", level="info")
records_count = len(records)
total_records += records_count
self.log(message=f"forwarding {records_count} records", level="info")
self.send_records(
records=list(records),
event_name=f"{self.trigger.name.lower().replace(' ', '-')}_{str(time.time())}",
)
except Exception as ex:
self.log_exception(ex, message=f"Failed to forward events from {self.bucket_name}")

return total_records

def stop(self) -> None:
self.alive.set()

Expand All @@ -238,13 +248,14 @@ def run(self) -> None:
while not self.alive.is_set():
try:
self.commit_marker()
self.forward_events()
forwarded_records = self.forward_events()
if forwarded_records == 0:
# Sleep only if there is no records to forward from current fetch
time.sleep(self.configuration.frequency)
except Exception as ex:
self.log_exception(ex, message="An unknown exception occurred")
raise

time.sleep(self.configuration.frequency)


class AwsS3FetcherConfiguration(BaseModel):
frequency: int = 60
Expand Down Expand Up @@ -320,7 +331,7 @@ def run(self) -> None:
try:
while True:
self.manage_workers()
time.sleep(900)
# time.sleep(900) TODO: Check if this is necessary
finally:
self.log(message=f"Stopping {self.name} Trigger", level="info")

Expand Down
6 changes: 3 additions & 3 deletions AWS/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

from sekoia_automation.loguru.config import init_logging

from asset_connector.device_assets import AwsDeviceAssetConnector
from asset_connector.users_assets import AwsUsersAssetConnector
from aws_helpers.account_validator import AwsAccountValidator
from connectors import AwsModule
from connectors.s3.logs.trigger_cloudtrail_logs import CloudTrailLogsTrigger
from connectors.s3.logs.trigger_flowlog_records import FlowlogRecordsTrigger
Expand All @@ -12,9 +15,6 @@
from connectors.s3.trigger_s3_ocsf_parquet import AwsS3OcsfTrigger
from connectors.s3.trigger_s3_records import AwsS3RecordsTrigger
from connectors.trigger_sqs_messages import AwsSqsMessagesTrigger
from asset_connector.device_assets import AwsDeviceAssetConnector
from asset_connector.users_assets import AwsUsersAssetConnector
from aws_helpers.account_validator import AwsAccountValidator

if __name__ == "__main__":
init_logging()
Expand Down
6 changes: 4 additions & 2 deletions AWS/tests/asset_connector/test_account_validator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import pytest
from unittest.mock import Mock, patch

import pytest

from aws_helpers.account_validator import AwsAccountValidator
from aws_helpers.base import AWSModule, AWSConfiguration
from aws_helpers.base import AWSConfiguration, AWSModule


class TestAwsAccountValidator:
Expand Down
17 changes: 9 additions & 8 deletions AWS/tests/asset_connector/test_device_assets.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
import pytest
from unittest import mock
from datetime import datetime
from unittest import mock

import pytest
import pytz
from botocore.exceptions import ClientError, NoCredentialsError
from dateutil.parser import isoparse
from sekoia_automation.asset_connector.models.ocsf.device import (
Device,
DeviceOCSFModel,
OperatingSystem,
OSTypeId,
OSTypeStr,
DeviceTypeStr,
DeviceTypeId,
DeviceTypeStr,
NetworkInterface,
NetworkInterfaceTypeId,
NetworkInterfaceTypeStr,
OperatingSystem,
OSTypeId,
OSTypeStr,
)
from sekoia_automation.asset_connector.models.ocsf.group import Group
from sekoia_automation.asset_connector.models.ocsf.organization import Organization
from sekoia_automation.module import Module
from dateutil.parser import isoparse

from asset_connector.device_assets import AwsDevice, AwsDeviceAssetConnector
from connectors import AwsModule, AwsModuleConfiguration
from asset_connector.device_assets import AwsDeviceAssetConnector, AwsDevice


@pytest.fixture
Expand Down
12 changes: 7 additions & 5 deletions AWS/tests/asset_connector/test_users_assets.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import pytest
from unittest import mock

import pytest
from botocore.exceptions import BotoCoreError, ClientError, NoCredentialsError
from dateutil.parser import isoparse
from sekoia_automation.asset_connector.models.ocsf.user import UserOCSFModel
from sekoia_automation.module import Module
from dateutil.parser import isoparse
from botocore.exceptions import NoCredentialsError, ClientError, BotoCoreError

from asset_connector.users_assets import AwsUser, AwsUsersAssetConnector
from connectors import AwsModule, AwsModuleConfiguration
from asset_connector.users_assets import AwsUsersAssetConnector, AwsUser


@pytest.fixture
Expand Down Expand Up @@ -199,9 +200,10 @@ def test_update_checkpoint_integration(test_aws_users_asset_connector):
# Test AwsUser class
def test_aws_user_initialization():
"""Test AwsUser class initialization."""
from sekoia_automation.asset_connector.models.ocsf.user import User, Account, AccountTypeStr, AccountTypeId
from datetime import datetime

from sekoia_automation.asset_connector.models.ocsf.user import Account, AccountTypeId, AccountTypeStr, User

# Create a test user
account = Account(
name="testuser",
Expand Down
7 changes: 5 additions & 2 deletions AWS/tests/connectors/s3/logs/test_cloudtrail_logs_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,19 +322,22 @@ def test_forward_events(
aws_mock:
"""
with mocked_client.handler_for("s3", S3Mock):
worker.forward_events()
total_events = worker.forward_events()
calls = {
call.kwargs["event_name"]: call.kwargs
for call in trigger.send_event.call_args_list
if call.kwargs.get("event_name")
}
expected_result = [record for obj in S3Objects.values() for record in orjson.loads(obj)["Records"]]

assert total_events == len(expected_result)
assert len(calls) == 1
assert all([name.startswith("aws-cloudtrail_") for name in calls.keys()])
assert [
record
for call in calls.values()
for record in read_file(symphony_storage, call["directory"], call["event"]["records_path"])
] == [record for obj in S3Objects.values() for record in orjson.loads(obj)["Records"]]
] == expected_result


def test_commit_marker(prefix: str, worker: CloudTrailLogsWorker, symphony_storage: Path, aws_mock):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from typing import BinaryIO
from unittest.mock import AsyncMock, MagicMock

import io
import orjson
import pytest
from faker import Faker
Expand Down