Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accounts tests #596

Merged
merged 4 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions configurations/system/tests_cases/accounts_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,17 @@
class AccountsTests(object):

@staticmethod
def accounts():
from tests_scripts.accounts.accounts import Accounts
def cspm():
from tests_scripts.accounts.cspm import CSPM
return KubescapeConfiguration(
name=inspect.currentframe().f_code.co_name,
test_obj=Accounts,
test_obj=CSPM
)

@staticmethod
def clusters():
from tests_scripts.accounts.clusters import Clusters
return KubescapeConfiguration(
name=inspect.currentframe().f_code.co_name,
test_obj=Clusters
)
123 changes: 123 additions & 0 deletions infrastructure/aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import os
import boto3
from botocore.exceptions import ClientError
import time
from urllib.parse import urlparse, parse_qs
from systest_utils import Logger


class CloudFormationManager:
def __init__(self, url, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None):
self.stack_name, self.template_url, self.region, self.parameters = self.extract_parameters_from_url(url)
self.cloudformation = boto3.client(
"cloudformation",
region_name=self.region,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_session_token=aws_session_token
)

def extract_parameters_from_url(self, url):
parsed_url = urlparse(url)

# Parse query parameters from the query and fragment (after #)
query_params = parse_qs(parsed_url.query)
fragment_params = parse_qs(parsed_url.fragment.split("?")[-1])

# Merge query and fragment parameters
query_params.update(fragment_params)

stack_name = query_params.get("stackName", [None])[0]
template_url = query_params.get("templateUrl", [None])[0]
region = query_params.get("region", [None])[0]

# Extract parameters starting with 'param_'
parameters = [
{"ParameterKey": key.replace("param_", ""), "ParameterValue": value[0]}
for key, value in query_params.items()
if key.startswith("param_")
]

if not stack_name or not template_url or not region:
raise ValueError("The URL does not contain the required parameters 'stackName', 'templateUrl', or 'region'.")

return stack_name, template_url, region, parameters

def create_stack(self, stack_name=None):
try:
if stack_name:
self.stack_name = stack_name
# Create the stack
response = self.cloudformation.create_stack(
StackName=self.stack_name,
TemplateURL=self.template_url,
Parameters=self.parameters,
Capabilities=["CAPABILITY_NAMED_IAM"] # Required for creating IAM resources
)
Logger.logger.info(f"Stack creation initiated for: {self.stack_name}")
return response["StackId"]

except ClientError as e:
Logger.logger.error(f"An error occurred during stack creation: {e}")
return None

def wait_for_stack_creation(self, delay=15, max_attempts=80):
try:
# Wait for the stack creation to complete
Logger.logger.info(f"Waiting for stack {self.stack_name} to be created...")
waiter = self.cloudformation.get_waiter("stack_create_complete")
waiter.wait(StackName=self.stack_name,
WaiterConfig={
"Delay": delay, # Polling interval in seconds
"MaxAttempts": max_attempts # Maximum number of attempts (total time = Delay * MaxAttempts)
})
Logger.logger.info(f"Stack {self.stack_name} created successfully.")

except ClientError as e:
Logger.logger.error(f"An error occurred while waiting for stack creation: {e}")
raise e

def get_stack_output_role_arn(self):
return self.get_stack_output("RoleArn")

def get_stack_output(self, output_key):
try:
# Describe the stack to fetch outputs
response = self.cloudformation.describe_stacks(StackName=self.stack_name)

# Retrieve the outputs section
stacks = response.get("Stacks", [])
if not stacks:
Logger.logger.error("No stacks found.")
return None

# Assuming only one stack is returned (by stack_name)
stack = stacks[0]
outputs = stack.get("Outputs", [])

# Extract ARN from outputs (if available)
for output in outputs:
if output_key == output.get("OutputKey"):
return output.get("OutputValue")

Logger.logger.error(f"No output found with key '{output_key}' in stack outputs.")
return None

except ClientError as e:
Logger.logger.error(f"An error occurred: {e}")
return None

def delete_stack(self):
try:
# Delete the stack
Logger.logger.info(f"Deleting stack {self.stack_name}...")
self.cloudformation.delete_stack(StackName=self.stack_name)

# Wait for the stack deletion to complete
waiter = self.cloudformation.get_waiter("stack_delete_complete")
waiter.wait(StackName=self.stack_name)
Logger.logger.info(f"Stack {self.stack_name} deleted successfully.")

except ClientError as e:
Logger.logger.error(f"An error occurred while deleting the stack: {e}")
raise e
26 changes: 24 additions & 2 deletions infrastructure/backend_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# encoding: utf-8
import math
import subprocess
import sys
import time
import traceback
Expand All @@ -15,7 +14,6 @@

from systest_utils.tests_logger import Logger
from systest_utils.wlid import Wlid
from systest_utils import TestUtil

import json
from infrastructure.api_login import *
Expand Down Expand Up @@ -138,6 +136,8 @@ class NotExistingCustomer(Exception):
API_ACCOUNTS_CLOUD_LIST = "/api/v1/accounts/cloud/list"
API_ACCOUNTS_KUBERNETES_LIST = "/api/v1/accounts/kubernetes/list"
API_ACCOUNTS_AWS_REGIONS = "/api/v1/accounts/aws/regions"
API_ACCOUNTS_CSPM_LINK = "/api/v1/accounts/aws/cspmstack"
API_ACCOUNTS_DELETE_FEATURE = "/api/v1/accounts/feature"
API_UNIQUEVALUES_ACCOUNTS_CLOUD= "/api/v1/uniqueValues/accounts/cloud"
API_UNIQUEVALUES_ACCOUNTS_KUBERNETES = "/api/v1/uniqueValues/accounts/kubernetes"

Expand Down Expand Up @@ -2778,6 +2778,28 @@ def test_webhook_message(self, body):
self.customer, r.status_code, r.text))
return r.json()

def get_cspm_link(self, region):
url = API_ACCOUNTS_CSPM_LINK + "?region=" + region
r = self.get(url, params={"customerGUID": self.selected_tenant_id})
if not 200 <= r.status_code < 300:
raise Exception(
'Error accessing CSPM link. Customer: "%s" (code: %d, message: %s)' % (
self.customer, r.status_code, r.text))
return r.json()

def delete_accounts_feature(self, account_guid, feature_name):
url = API_ACCOUNTS_DELETE_FEATURE
params = {"customerGUID": self.selected_tenant_id}
body = {
"guid": account_guid,
"featureName": feature_name
}
r = self.delete(url, params=params, json=body)
if not 200 <= r.status_code < 300:
raise Exception(
'Error deleting account feature. Customer: "%s" (code: %d, message: %s)' % (
self.customer, r.status_code, r.text))
return r.json()

def get_cloud_accounts(self, body=None, **kwargs):
url = API_ACCOUNTS_CLOUD_LIST
Expand Down
24 changes: 18 additions & 6 deletions system_test_mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,19 @@
"owner": "[email protected]"
},
"workflows_configurations": {
"target": [
"Backend"
],
"target_repositories": [
"config-service-dummy",
"cadashboardbe-dummy",
"event-ingester-service-dummy"
],
"description": "Checks workflows configurations",
"skip_on_environment": "production,production-us",
"owner": "[email protected]"
},
"cspm": {
"target": [
"Backend"
],
Expand All @@ -1188,20 +1201,19 @@
"cadashboardbe",
"event-ingester-service"
],
"description": "Checks workflows configurations",
"description": "Checks accounts cspm",
"skip_on_environment": "",
"owner": "[email protected]"
},
"accounts": {
"clusters": {
"target": [
"Backend"
],
"target_repositories": [
"config-service-dummy",
"cadashboardbe-dummy",
"event-ingester-service-dummy"
"cadashboardbe",
"event-ingester-service"
],
"description": "Checks accounts",
"description": "Checks accounts clusters",
"skip_on_environment": "",
"owner": "[email protected]"
}
Expand Down
Loading
Loading