Skip to content

Commit

Permalink
Add account ID extraction and cleanup functionality in CSPM tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kooomix committed Jan 29, 2025
1 parent 69aaa72 commit 1f9ed69
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 5 deletions.
18 changes: 17 additions & 1 deletion infrastructure/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import time
from urllib.parse import urlparse, parse_qs
from systest_utils import Logger
import re



class CloudFormationManager:
Expand Down Expand Up @@ -120,4 +122,18 @@ def delete_stack(self):

except ClientError as e:
Logger.logger.error(f"An error occurred while deleting the stack: {e}")
raise e
raise e




def extract_account_id(arn):
"""
Extracts the AWS account ID from an ARN string.
:param arn: The ARN string (e.g., "arn:aws:iam::12345678:role/armo-scan-role-cross-with_customer-12345678")
:return: The extracted account ID as a string or None if not found.
"""
match = re.search(r"arn:aws:iam::(\d+):", arn)
return match.group(1) if match else None

2 changes: 1 addition & 1 deletion system_test_mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -1202,7 +1202,7 @@
"event-ingester-service"
],
"description": "Checks accounts cspm",
"skip_on_environment": "",
"skip_on_environment": "production-us",
"owner": "[email protected]"
},
"clusters": {
Expand Down
2 changes: 1 addition & 1 deletion tests_scripts/accounts/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def __init__(self, test_obj=None, backend=None, kubernetes_obj=None, test_driver
"capabilities.runtimeObservability": "disable",
"capabilities.networkPolicyService": "disable",
"capabilities.seccompProfileService": "disable",
"capabilities.nodeProfileService": "disable",
"capabilities.nodeProfileService": "enable",
"capabilities.vulnerabilityScan": "disable",
"grypeOfflineDB.enabled": "false",
"capabilities.relevancy": "disabled",
Expand Down
44 changes: 42 additions & 2 deletions tests_scripts/accounts/cspm.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class CSPM(Accounts):
def __init__(self, test_obj=None, backend=None, kubernetes_obj=None, test_driver=None):
super().__init__(test_driver=test_driver, test_obj=test_obj, backend=backend, kubernetes_obj=kubernetes_obj)

self.stack_manager = None




Expand All @@ -55,6 +57,7 @@ def start(self):
"""

assert self.backend is not None, f'the test {self.test_driver.test_name} must run with backend'

stack_region = "us-east-1"
# generate random number for cloud account name for uniqueness
rand = str(random.randint(10000000, 99999999))
Expand Down Expand Up @@ -84,6 +87,10 @@ def start(self):
Logger.logger.info('Stage 3: Create bad arn cloud account with cspm')
self.create_and_validate_cloud_account_with_cspm(cloud_account_name, bad_arn, PROVIDER_AWS, region=stack_region, expect_failure=True)

account_id = aws.extract_account_id(test_arn)
self.cleanup_existing_aws_cloud_accounts(account_id)


Logger.logger.info('Stage 4: Create new arn cloud account with cspm')
self.create_and_validate_cloud_account_with_cspm(cloud_account_name, test_arn, PROVIDER_AWS, region=stack_region, expect_failure=False)

Expand Down Expand Up @@ -121,11 +128,43 @@ def start(self):


def cleanup(self, **kwargs):
self.stack_manager.delete_stack()
if self.stack_manager:
self.stack_manager.delete_stack()
return super().cleanup(**kwargs)



def cleanup_existing_aws_cloud_accounts(self, account_id):
"""
Cleanup existing aws cloud accounts.
"""

if not account_id:
raise Exception("account_id is required")

body = {
"pageSize": 100,
"pageNum": 0,
"innerFilters": [
{
"provider": PROVIDER_AWS,
"providerInfo.accountID":account_id
}
]
}
res = self.backend.get_cloud_accounts(body=body)

if "response" in res:
if len(res["response"]) == 0:
Logger.logger.info(f"No existing aws cloud accounts to cleanup for account_id {account_id}")
return
for account in res["response"]:
guid = account["guid"]
self.backend.delete_cloud_account(guid)
Logger.logger.info(f"Deleted cloud account with guid {guid} for account_id {account_id}")

return res

def get_and_validate_cspm_link(self, region) -> str:
"""
Get and validate cspm link.
Expand Down Expand Up @@ -156,7 +195,8 @@ def create_and_validate_cloud_account_with_cspm(self, cloud_account_name:str, ar
try:
res = self.backend.create_cloud_account(body=body, provider=provider)
except Exception as e:
Logger.logger.error(f"failed to create cloud account, body used: {body}, error is {e}")
if not expect_failure:
Logger.logger.error(f"failed to create cloud account, body used: {body}, error is {e}")
failed = True

assert failed == expect_failure, f"expected_failure is {expect_failure}, but failed is {failed}, body used: {body}"
Expand Down

0 comments on commit 1f9ed69

Please sign in to comment.