diff --git a/src/ecs-mcp-server/awslabs/ecs_mcp_server/api/security_analysis.py b/src/ecs-mcp-server/awslabs/ecs_mcp_server/api/security_analysis.py new file mode 100644 index 0000000000..1130ced07a --- /dev/null +++ b/src/ecs-mcp-server/awslabs/ecs_mcp_server/api/security_analysis.py @@ -0,0 +1,478 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +API for ECS security analysis operations. + +This module provides comprehensive security analysis for ECS clusters, +identifying misconfigurations and providing actionable recommendations. +""" + +import logging +from typing import Any, Dict, List, Optional + +from awslabs.ecs_mcp_server.api.resource_management import ecs_api_operation + +logger = logging.getLogger(__name__) + + +async def analyze_ecs_security( + cluster_names: List[str], + regions: Optional[List[str]] = None, +) -> Dict[str, Any]: + """ + Main entry point for ECS security analysis. + + Args: + cluster_names: List of cluster names to analyze (required) + regions: Optional list of regions (default: ["us-east-1"]) + + Returns: + Dictionary with analysis results and summary + """ + if not cluster_names: + return { + "status": "error", + "error": ( + "cluster_names is required. Please specify which clusters to analyze. " + "Use ecs_resource_management tool to list available clusters first." + ), + "total_clusters_analyzed": 0, + "total_recommendations": 0, + "results": [], + } + + regions = regions or ["us-east-1"] + all_results = [] + errors = [] + + for region in regions: + try: + # Analyze specified clusters + clusters_to_analyze = cluster_names + + # Analyze each cluster + for cluster_name in clusters_to_analyze: + try: + # Collect data + adapter = DataAdapter(region) + cluster_data = await adapter.collect_cluster_data(cluster_name) + + # Analyze security + analyzer = SecurityAnalyzer(cluster_name, region) + result = analyzer.analyze(cluster_data) + + all_results.append(result) + except Exception as e: + logger.error(f"Error analyzing cluster {cluster_name} in {region}: {e}") + errors.append( + { + "cluster": cluster_name, + "region": region, + "error": str(e), + } + ) + except Exception as e: + logger.error(f"Error processing region {region}: {e}") + errors.append( + { + "region": region, + "error": str(e), + } + ) + + # Calculate totals + total_recommendations = sum(len(r.get("recommendations", [])) for r in all_results) + + response = { + "status": "success" if all_results else "error", + "total_clusters_analyzed": len(all_results), + "total_recommendations": total_recommendations, + "results": all_results, + } + + if errors: + response["errors"] = errors + + return response + + +async def _discover_clusters(region: str) -> Dict[str, Any]: + """ + Discover all clusters in a region. + + Args: + region: AWS region + + Returns: + Dictionary with list of cluster names or error + """ + try: + response = await ecs_api_operation("ListClusters", {}) + + if "error" in response: + return {"error": response["error"]} + + cluster_arns = response.get("clusterArns", []) + # Extract cluster names from ARNs + cluster_names = [arn.split("/")[-1] for arn in cluster_arns] + + return {"clusters": cluster_names} + except Exception as e: + logger.error(f"Error discovering clusters in {region}: {e}") + return {"error": str(e)} + + +class DataAdapter: + """Adapter that uses existing MCP tools to collect ECS data.""" + + def __init__(self, region: str): + """ + Initialize DataAdapter. + + Args: + region: AWS region + """ + self.region = region + + async def collect_cluster_data(self, cluster_name: str) -> Dict[str, Any]: + """ + Collect cluster data using existing ECS API operations. + + Args: + cluster_name: Name of the cluster + + Returns: + Dictionary with cluster data or error + """ + try: + response = await ecs_api_operation( + "DescribeClusters", + {"clusters": [cluster_name], "include": ["SETTINGS", "CONFIGURATIONS"]}, + ) + + if "error" in response: + return {"error": response["error"], "cluster_name": cluster_name} + + clusters = response.get("clusters", []) + if not clusters: + return { + "error": f"Cluster {cluster_name} not found", + "cluster_name": cluster_name, + } + + return {"status": "success", "cluster": clusters[0]} + except Exception as e: + logger.error(f"Error collecting cluster data for {cluster_name}: {e}") + return {"error": str(e), "cluster_name": cluster_name} + + +class SecurityAnalyzer: + """Security analysis engine for ECS resources.""" + + def __init__(self, cluster_name: str, region: str): + """ + Initialize SecurityAnalyzer. + + Args: + cluster_name: Name of the cluster being analyzed + region: AWS region + """ + self.cluster_name = cluster_name + self.region = region + self.recommendations = [] + + def _add_recommendation( + self, + title: str, + severity: str, + category: str, + resource: str, + issue: str, + recommendation: str, + remediation_steps: List[str], + documentation_links: List[str], + resource_type: str = "Cluster", + ) -> None: + """ + Add a security recommendation with consistent structure. + + Args: + title: Brief title of the issue + severity: Severity level (High, Medium, Low) + category: Category of the issue + resource: Resource name + issue: Description of the issue + recommendation: Recommended action + remediation_steps: List of CLI commands or steps + documentation_links: List of AWS documentation URLs + resource_type: Type of resource (default: Cluster) + """ + self.recommendations.append( + { + "title": title, + "severity": severity, + "category": category, + "resource": resource, + "resource_type": resource_type, + "cluster_name": self.cluster_name, + "region": self.region, + "issue": issue, + "recommendation": recommendation, + "remediation_steps": remediation_steps, + "documentation_links": documentation_links, + } + ) + + def analyze(self, ecs_data: Dict[str, Any]) -> Dict[str, Any]: + """ + Main analysis orchestrator. + + Args: + ecs_data: Dictionary containing ECS resource data + + Returns: + Dictionary with analysis results + """ + self.recommendations = [] + + if "error" in ecs_data: + return { + "status": "error", + "error": ecs_data["error"], + "cluster_name": ecs_data.get("cluster_name", "unknown"), + "region": self.region, + "recommendations": [], + "summary": {"total_issues": 0, "by_severity": {}, "by_category": {}}, + } + + cluster_data = ecs_data.get("cluster", {}) + + # Run security checks (will be implemented in subsequent subtasks) + self._analyze_cluster_security(cluster_data) + self._analyze_logging_security(cluster_data) + + # Generate summary + summary = self._generate_summary() + + return { + "status": "success", + "cluster_name": cluster_data.get("clusterName", "unknown"), + "region": self.region, + "recommendations": self.recommendations, + "summary": summary, + } + + def _analyze_cluster_security(self, cluster: Dict[str, Any]) -> None: + """ + Analyze cluster-level security. + + Checks: + - Container Insights configuration + - Execute command logging settings + - Cluster status and availability + + Args: + cluster: Cluster data dictionary + """ + cluster_name = cluster.get("clusterName", "unknown") + + # Check Container Insights + settings = cluster.get("settings", []) + container_insights_enabled = any( + s.get("name") == "containerInsights" and s.get("value") == "enabled" for s in settings + ) + + if not container_insights_enabled: + self._add_recommendation( + title="Container Insights Disabled", + severity="Medium", + category="Monitoring", + resource=cluster_name, + issue="Container Insights is not enabled for this cluster", + recommendation=( + "Enable Container Insights to collect metrics and logs from your " + "containerized applications and microservices" + ), + remediation_steps=[ + f"aws ecs update-cluster-settings --cluster {cluster_name} " + "--settings name=containerInsights,value=enabled" + ], + documentation_links=[ + "https://docs.aws.amazon.com/AmazonECS/latest/developerguide/" + "cloudwatch-container-insights.html" + ], + ) + + # Check execute command logging + configuration = cluster.get("configuration", {}) + exec_config = configuration.get("executeCommandConfiguration", {}) + logging_config = exec_config.get("logging", "NONE") + + if logging_config == "NONE" or logging_config == "DEFAULT": + severity = "High" if logging_config == "NONE" else "Medium" + self._add_recommendation( + title="Execute Command Logging Not Configured", + severity=severity, + category="Logging", + resource=cluster_name, + issue=( + f"Execute command logging is set to {logging_config}. " + "This means ECS Exec sessions are not being logged." + ), + recommendation=( + "Configure execute command logging to CloudWatch Logs or S3 " + "to maintain audit trails of interactive sessions" + ), + remediation_steps=[ + f"aws ecs update-cluster --cluster {cluster_name} " + "--configuration executeCommandConfiguration=" + "{logging=OVERRIDE,logConfiguration={cloudWatchLogGroupName=" + f"/aws/ecs/{cluster_name}/exec}}" + ], + documentation_links=[ + "https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ecs-exec.html" + ], + ) + + # Check cluster status + status = cluster.get("status", "UNKNOWN") + if status != "ACTIVE": + self._add_recommendation( + title="Cluster Not Active", + severity="High", + category="Availability", + resource=cluster_name, + issue=f"Cluster status is {status}, not ACTIVE", + recommendation=( + "Investigate why the cluster is not in ACTIVE state. " + "This may indicate a configuration or resource issue." + ), + remediation_steps=[ + f"aws ecs describe-clusters --clusters {cluster_name} " + "--include SETTINGS,CONFIGURATIONS" + ], + documentation_links=[ + "https://docs.aws.amazon.com/AmazonECS/latest/developerguide/clusters.html" + ], + ) + + def _analyze_logging_security(self, cluster: Dict[str, Any]) -> None: + """ + Analyze logging security. + + Checks: + - CloudWatch logging configuration + - Log retention policies + + Args: + cluster: Cluster data dictionary + """ + cluster_name = cluster.get("clusterName", "unknown") + + # Check execute command logging configuration (detailed check) + configuration = cluster.get("configuration", {}) + exec_config = configuration.get("executeCommandConfiguration", {}) + log_config = exec_config.get("logConfiguration", {}) + + # Check if CloudWatch log group is configured + cw_log_group = log_config.get("cloudWatchLogGroupName") + if not cw_log_group: + self._add_recommendation( + title="CloudWatch Log Group Not Configured for ECS Exec", + severity="Medium", + category="Logging", + resource=cluster_name, + issue=( + "CloudWatch log group is not configured for ECS Exec sessions. " + "This limits audit capabilities." + ), + recommendation=( + "Configure a CloudWatch log group to capture ECS Exec session logs " + "for security auditing and compliance" + ), + remediation_steps=[ + "# First, create a CloudWatch log group", + f"aws logs create-log-group --log-group-name /aws/ecs/{cluster_name}/exec", + "", + "# Then update the cluster configuration", + f"aws ecs update-cluster --cluster {cluster_name} " + "--configuration executeCommandConfiguration=" + "{logging=OVERRIDE,logConfiguration={cloudWatchLogGroupName=" + f"/aws/ecs/{cluster_name}/exec}}", + ], + documentation_links=[ + "https://docs.aws.amazon.com/AmazonECS/latest/developerguide/" + "ecs-exec.html#ecs-exec-logging" + ], + ) + + # Check if log encryption is enabled + cw_encryption_enabled = log_config.get("cloudWatchEncryptionEnabled", False) + if cw_log_group and not cw_encryption_enabled: + self._add_recommendation( + title="CloudWatch Logs Encryption Not Enabled", + severity="Medium", + category="Logging", + resource=cluster_name, + issue=( + "CloudWatch logs encryption is not enabled for ECS Exec sessions. " + "Logs may contain sensitive information." + ), + recommendation=( + "Enable CloudWatch logs encryption to protect sensitive data " + "in ECS Exec session logs" + ), + remediation_steps=[ + f"aws ecs update-cluster --cluster {cluster_name} " + "--configuration executeCommandConfiguration=" + "{logging=OVERRIDE,logConfiguration={cloudWatchLogGroupName=" + f"{cw_log_group},cloudWatchEncryptionEnabled=true}}", + ], + documentation_links=[ + "https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/" + "encrypt-log-data-kms.html" + ], + ) + + def _generate_summary(self) -> Dict[str, Any]: + """ + Generate summary statistics. + + Calculates: + - Total issues by severity (High/Medium/Low) + - Issues by category + - Issues by cluster + + Returns: + Dictionary with summary statistics + """ + by_severity = {"High": 0, "Medium": 0, "Low": 0} + by_category = {} + + for rec in self.recommendations: + # Count by severity + severity = rec.get("severity", "Unknown") + if severity in by_severity: + by_severity[severity] += 1 + + # Count by category + category = rec.get("category", "Unknown") + by_category[category] = by_category.get(category, 0) + 1 + + return { + "total_issues": len(self.recommendations), + "by_severity": by_severity, + "by_category": by_category, + } diff --git a/src/ecs-mcp-server/awslabs/ecs_mcp_server/main.py b/src/ecs-mcp-server/awslabs/ecs_mcp_server/main.py index a1dbaa58a9..1cf4272473 100755 --- a/src/ecs-mcp-server/awslabs/ecs_mcp_server/main.py +++ b/src/ecs-mcp-server/awslabs/ecs_mcp_server/main.py @@ -31,6 +31,7 @@ deployment_status, infrastructure, resource_management, + security_analysis, troubleshooting, ) from awslabs.ecs_mcp_server.utils.config import get_config @@ -133,6 +134,7 @@ def _create_ecs_mcp_server() -> Tuple[FastMCP, Dict[str, Any]]: deployment_status.register_module(mcp) resource_management.register_module(mcp) troubleshooting.register_module(mcp) + security_analysis.register_module(mcp) delete.register_module(mcp) # Register all proxies diff --git a/src/ecs-mcp-server/awslabs/ecs_mcp_server/modules/security_analysis.py b/src/ecs-mcp-server/awslabs/ecs_mcp_server/modules/security_analysis.py new file mode 100644 index 0000000000..083f10cfc6 --- /dev/null +++ b/src/ecs-mcp-server/awslabs/ecs_mcp_server/modules/security_analysis.py @@ -0,0 +1,198 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Security Analysis module for ECS MCP Server. +This module provides comprehensive security analysis for ECS clusters. +""" + +import logging +from typing import Any, Dict, List, Optional + +from fastmcp import FastMCP +from pydantic import Field + +from awslabs.ecs_mcp_server.api.security_analysis import analyze_ecs_security + +logger = logging.getLogger(__name__) + + +def register_module(mcp: FastMCP) -> None: + """Register security analysis module tools and prompts with the MCP server.""" + + @mcp.tool(name="analyze_ecs_security", annotations=None) + async def mcp_analyze_ecs_security( + cluster_names: List[str] = Field( # noqa: B008 + ..., + description=( + "REQUIRED: List of ECS cluster names to analyze. " + "User must explicitly specify which clusters to analyze. " + "Example: ['my-cluster', 'prod-cluster']" + ), + ), + regions: Optional[List[str]] = Field( # noqa: B008 + default=None, + description=( + "List of AWS regions where the clusters are located. " + "Defaults to ['us-east-1'] if not specified. " + "Example: ['us-east-1', 'us-west-2']" + ), + ), + ) -> Dict[str, Any]: + """ + Analyze ECS cluster security configurations and provide recommendations. + + Use this tool when you need to assess the security posture of ECS clusters, + identify security misconfigurations, and get actionable remediation steps. + + IMPORTANT - REQUIRED USER INTERACTION BEFORE CALLING THIS TOOL: + + STEP 1 - ASK FOR REGION FIRST: + You MUST ask the user: "Which AWS region would you like to analyze for ECS security issues?" + - Provide common options: us-east-1 (default), us-west-2, eu-west-1, ap-southeast-1 + - Wait for user to specify the region + + STEP 2 - LIST CLUSTERS IN THAT REGION: + Use ecs_resource_management tool with ListClusters for the user-specified region + Show the user: "I found these clusters in {region}: [list]" + + STEP 3 - ASK USER TO SELECT CLUSTERS: + Ask: "Which cluster(s) would you like me to analyze?" + Allow single or multiple selections + + STEP 4 - CALL THIS TOOL: + Only after completing steps 1-3, call this tool with selected clusters and region + + STEP 5 - AFTER ANALYSIS: + Ask: "Would you like to analyze clusters in a different region?" + If yes, repeat from STEP 1 + + USAGE EXAMPLES: + 1. Analyze specific clusters in default region (us-east-1): + cluster_names: ["my-cluster", "prod-cluster"] + + 2. Analyze specific clusters in specific region: + cluster_names: ["my-cluster"] + regions: ["us-west-2"] + + 3. Analyze same-named clusters across multiple regions: + cluster_names: ["prod-cluster"] + regions: ["us-east-1", "us-west-2"] + Note: This will look for "prod-cluster" in both regions + + MULTI-REGION WORKFLOW: + - Analyze one region at a time for better user experience + - After showing results, ask: "Would you like to analyze another region?" + - If yes, repeat the workflow for the new region + - This allows users to focus on one region's issues before moving to the next + + WORKFLOW: + 1. List available clusters using ecs_resource_management tool + 2. Ask user to select which clusters to analyze + 3. Run this tool with the selected cluster names + 4. Review the security recommendations organized by severity and category + 5. Follow the remediation steps provided for each security issue + 6. Re-run the analysis after implementing fixes to verify improvements + + The analysis includes: + - Container Insights monitoring configuration + - Execute command logging settings + - Cluster status and availability + - CloudWatch logging configuration + - Log encryption settings + + Parameters: + cluster_names: REQUIRED list of cluster names to analyze. + User must explicitly select which clusters to analyze. + Example: ["my-cluster", "prod-cluster"] + + regions: Optional list of AWS regions where clusters are located. + Defaults to ["us-east-1"] if not specified. + Example: ["us-east-1", "us-west-2"] + + Returns: + Dictionary containing: + - status: "success" or "error" + - total_clusters_analyzed: Number of clusters analyzed + - total_recommendations: Total number of security recommendations + - results: List of analysis results per cluster with recommendations and summary + + PRESENTATION GUIDELINES: + + STRUCTURE YOUR RESPONSE IN TWO SECTIONS: + + SECTION 1 - EXECUTIVE SUMMARY (Show this first): + ``` + ## Security Analysis Results for {cluster_name} + + Summary: Found {total} security recommendations + 🔴 High: {count} issues + 🟠 Medium: {count} issues + 🟡 Low: {count} issues + + ### Issues Found: + 1. 🔴 [HIGH] Execute Command Logging Not Configured + - ECS Exec sessions are not being logged (NONE setting) + - Cluster: my-ecs-cluster | Region: us-east-1 + + 2. 🟠 [MEDIUM] CloudWatch Log Group Not Configured + - No CloudWatch log group for ECS Exec audit trails + - Cluster: my-ecs-cluster | Region: us-east-1 + ``` + + SECTION 2 - DETAILED REMEDIATION (Show this after summary): + For each issue, provide: + - Full explanation of the security risk + - Step-by-step remediation commands + - AWS documentation links + + FORMATTING RULES: + 1. Use AWS Trusted Advisor color coding: + 🔴 High (critical), 🟠 Medium (important), 🟡 Low (minor) + 2. Group by severity: High → Medium → Low + 3. Show resource hierarchy: Cluster: {name} | Region: {region} + 4. Include CLI commands in code blocks + 5. Link to AWS documentation for each issue type + + CRITICAL - YOU MUST ASK FOR REGION FIRST: + + DO NOT automatically list clusters without asking for region first! + + Correct workflow: + 1. Ask: "Which AWS region? (us-east-1, us-west-2, eu-west-1, etc.)" + 2. User specifies region (e.g., "us-west-2") + 3. List clusters in that specific region + 4. Ask user to select clusters + 5. Run this tool with selected clusters and region + 6. After results, ask: "Analyze another region?" + + Wrong workflow (DO NOT DO THIS): + ❌ Listing clusters without asking for region first + ❌ Assuming us-east-1 without asking user + ❌ Not offering to check other regions after analysis + """ + logger.info(f"Security analysis requested - clusters: {cluster_names}, regions: {regions}") + return await analyze_ecs_security(cluster_names=cluster_names, regions=regions) + + @mcp.prompt("analyze ecs security") + def security_analysis_prompt() -> List[str]: + """User wants to analyze ECS security""" + logger.info("Security analysis prompt triggered") + return ["analyze_ecs_security"] + + @mcp.prompt("check ecs security") + def security_check_prompt() -> List[str]: + """User wants to check ECS security""" + logger.info("Security check prompt triggered") + return ["analyze_ecs_security"] diff --git a/src/ecs-mcp-server/tests/unit/test_security_analysis.py b/src/ecs-mcp-server/tests/unit/test_security_analysis.py new file mode 100644 index 0000000000..00d3c64140 --- /dev/null +++ b/src/ecs-mcp-server/tests/unit/test_security_analysis.py @@ -0,0 +1,405 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for ECS security analysis functionality.""" + +from unittest.mock import patch + +import pytest + +from awslabs.ecs_mcp_server.api.security_analysis import ( + DataAdapter, + SecurityAnalyzer, + _discover_clusters, + analyze_ecs_security, +) + +# ---------------------------------------------------------------------------- +# Test Fixtures +# ---------------------------------------------------------------------------- + + +@pytest.fixture +def secure_cluster(): + """Cluster with all security features enabled.""" + return { + "clusterName": "secure-cluster", + "status": "ACTIVE", + "settings": [{"name": "containerInsights", "value": "enabled"}], + "configuration": { + "executeCommandConfiguration": { + "logging": "OVERRIDE", + "logConfiguration": { + "cloudWatchLogGroupName": "/aws/ecs/secure-cluster/exec", + "cloudWatchEncryptionEnabled": True, + }, + } + }, + } + + +@pytest.fixture +def insecure_cluster(): + """Cluster with multiple security issues.""" + return { + "clusterName": "insecure-cluster", + "status": "ACTIVE", + "settings": [], + "configuration": {"executeCommandConfiguration": {"logging": "NONE"}}, + } + + +# ---------------------------------------------------------------------------- +# DataAdapter Tests +# ---------------------------------------------------------------------------- + + +@pytest.mark.anyio +@patch("awslabs.ecs_mcp_server.api.security_analysis.ecs_api_operation") +async def test_data_adapter_success(mock_api): + """Test successful cluster data collection.""" + mock_api.return_value = { + "clusters": [{"clusterName": "test", "status": "ACTIVE", "settings": []}] + } + + adapter = DataAdapter("us-east-1") + result = await adapter.collect_cluster_data("test") + + assert result["status"] == "success" + assert result["cluster"]["clusterName"] == "test" + + +@pytest.mark.anyio +@patch("awslabs.ecs_mcp_server.api.security_analysis.ecs_api_operation") +@pytest.mark.parametrize( + "api_response,expected_error", + [ + ({"clusters": []}, "not found"), + ({"error": "AccessDenied"}, "AccessDenied"), + ], +) +async def test_data_adapter_errors(mock_api, api_response, expected_error): + """Test DataAdapter error handling.""" + mock_api.return_value = api_response + + adapter = DataAdapter("us-east-1") + result = await adapter.collect_cluster_data("test") + + assert "error" in result + assert expected_error in result["error"] + + +@pytest.mark.anyio +@patch("awslabs.ecs_mcp_server.api.security_analysis.ecs_api_operation") +async def test_data_adapter_exception(mock_api): + """Test DataAdapter exception handling.""" + mock_api.side_effect = Exception("Network error") + + adapter = DataAdapter("us-east-1") + result = await adapter.collect_cluster_data("test") + + assert "error" in result + + +# ---------------------------------------------------------------------------- +# SecurityAnalyzer Tests +# ---------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "cluster_data,expected_rec_count,expected_high,expected_medium", + [ + ( + { + "clusterName": "test", + "status": "ACTIVE", + "settings": [], + "configuration": {"executeCommandConfiguration": {"logging": "NONE"}}, + }, + 3, + 1, + 2, + ), # No insights, no logging, no log group + ( + { + "clusterName": "test", + "status": "INACTIVE", + "settings": [], + "configuration": {"executeCommandConfiguration": {"logging": "DEFAULT"}}, + }, + 4, + 1, + 3, + ), # Inactive + no insights + default logging + no log group + ], +) +def test_security_analyzer_recommendations( + cluster_data, expected_rec_count, expected_high, expected_medium +): + """Test SecurityAnalyzer generates correct recommendations.""" + analyzer = SecurityAnalyzer("test", "us-east-1") + result = analyzer.analyze({"cluster": cluster_data}) + + assert result["status"] == "success" + assert len(result["recommendations"]) == expected_rec_count + assert result["summary"]["by_severity"]["High"] == expected_high + assert result["summary"]["by_severity"]["Medium"] == expected_medium + + +def test_security_analyzer_secure_cluster(secure_cluster): + """Test that secure cluster generates no recommendations.""" + analyzer = SecurityAnalyzer("secure", "us-east-1") + result = analyzer.analyze({"cluster": secure_cluster}) + + assert result["status"] == "success" + assert len(result["recommendations"]) == 0 + assert result["summary"]["total_issues"] == 0 + + +def test_security_analyzer_error_handling(): + """Test SecurityAnalyzer handles error data.""" + analyzer = SecurityAnalyzer("test", "us-east-1") + result = analyzer.analyze({"error": "Cluster not found", "cluster_name": "test"}) + + assert result["status"] == "error" + assert "error" in result + assert len(result["recommendations"]) == 0 + + +@pytest.mark.parametrize( + "settings,expected_rec", + [ + ([], True), # No Container Insights + ([{"name": "containerInsights", "value": "disabled"}], True), + ([{"name": "containerInsights", "value": "enabled"}], False), + ], +) +def test_container_insights_check(settings, expected_rec): + """Test Container Insights detection.""" + cluster = { + "clusterName": "test", + "status": "ACTIVE", + "settings": settings, + "configuration": {"executeCommandConfiguration": {"logging": "OVERRIDE"}}, + } + + analyzer = SecurityAnalyzer("test", "us-east-1") + result = analyzer.analyze({"cluster": cluster}) + + insights_recs = [r for r in result["recommendations"] if "Container Insights" in r["title"]] + assert (len(insights_recs) > 0) == expected_rec + + +@pytest.mark.parametrize( + "logging_config,expected_severity", + [ + ("NONE", "High"), + ("DEFAULT", "Medium"), + ("OVERRIDE", None), + ], +) +def test_exec_logging_check(logging_config, expected_severity): + """Test execute command logging detection.""" + cluster = { + "clusterName": "test", + "status": "ACTIVE", + "settings": [{"name": "containerInsights", "value": "enabled"}], + "configuration": {"executeCommandConfiguration": {"logging": logging_config}}, + } + + analyzer = SecurityAnalyzer("test", "us-east-1") + result = analyzer.analyze({"cluster": cluster}) + + exec_recs = [r for r in result["recommendations"] if "Execute Command Logging" in r["title"]] + + if expected_severity: + assert len(exec_recs) > 0 + assert exec_recs[0]["severity"] == expected_severity + else: + assert len(exec_recs) == 0 + + +def test_cloudwatch_encryption_check(): + """Test CloudWatch encryption detection.""" + cluster = { + "clusterName": "test", + "status": "ACTIVE", + "settings": [{"name": "containerInsights", "value": "enabled"}], + "configuration": { + "executeCommandConfiguration": { + "logging": "OVERRIDE", + "logConfiguration": { + "cloudWatchLogGroupName": "/aws/ecs/test/exec", + "cloudWatchEncryptionEnabled": False, + }, + } + }, + } + + analyzer = SecurityAnalyzer("test", "us-east-1") + result = analyzer.analyze({"cluster": cluster}) + + enc_recs = [r for r in result["recommendations"] if "Encryption" in r["title"]] + assert len(enc_recs) == 1 + assert enc_recs[0]["severity"] == "Medium" + + +def test_recommendation_structure(insecure_cluster): + """Test all recommendations have required fields.""" + analyzer = SecurityAnalyzer("test", "us-east-1") + result = analyzer.analyze({"cluster": insecure_cluster}) + + required_fields = [ + "title", + "severity", + "category", + "resource", + "issue", + "recommendation", + "remediation_steps", + "documentation_links", + ] + + for rec in result["recommendations"]: + for field in required_fields: + assert field in rec + assert isinstance(rec["remediation_steps"], list) + assert len(rec["remediation_steps"]) > 0 + + +# ---------------------------------------------------------------------------- +# Integration Tests +# ---------------------------------------------------------------------------- + + +@pytest.mark.anyio +@patch("awslabs.ecs_mcp_server.api.security_analysis.ecs_api_operation") +async def test_analyze_ecs_security_success(mock_api, insecure_cluster): + """Test main analyze_ecs_security function.""" + mock_api.return_value = {"clusters": [insecure_cluster]} + + result = await analyze_ecs_security(cluster_names=["test"], regions=["us-east-1"]) + + assert result["status"] == "success" + assert result["total_clusters_analyzed"] == 1 + assert result["total_recommendations"] > 0 + + +@pytest.mark.anyio +async def test_analyze_requires_cluster_names(): + """Test that cluster_names is required.""" + result = await analyze_ecs_security(cluster_names=[], regions=["us-east-1"]) + + assert result["status"] == "error" + assert "cluster_names is required" in result["error"] + + +@pytest.mark.anyio +@patch("awslabs.ecs_mcp_server.api.security_analysis.ecs_api_operation") +async def test_analyze_multiple_clusters(mock_api): + """Test analyzing multiple clusters.""" + + def mock_side_effect(operation, params): + cluster_name = params["clusters"][0] + return { + "clusters": [ + { + "clusterName": cluster_name, + "status": "ACTIVE", + "settings": [], + "configuration": {"executeCommandConfiguration": {"logging": "NONE"}}, + } + ] + } + + mock_api.side_effect = mock_side_effect + + result = await analyze_ecs_security( + cluster_names=["cluster1", "cluster2"], regions=["us-east-1"] + ) + + assert result["status"] == "success" + assert result["total_clusters_analyzed"] == 2 + + +@pytest.mark.anyio +@patch("awslabs.ecs_mcp_server.api.security_analysis.ecs_api_operation") +async def test_analyze_with_errors(mock_api): + """Test error handling during analysis.""" + mock_api.return_value = {"error": "AccessDenied"} + + result = await analyze_ecs_security(cluster_names=["test"], regions=["us-east-1"]) + + assert result["status"] == "success" + assert len(result["results"]) == 1 + assert result["results"][0]["status"] == "error" + + +@pytest.mark.anyio +async def test_analyze_with_exception(): + """Test exception handling.""" + with patch.object(DataAdapter, "collect_cluster_data", side_effect=Exception("Error")): + result = await analyze_ecs_security(cluster_names=["test"], regions=["us-east-1"]) + + assert "errors" in result + assert len(result["errors"]) > 0 + + +@pytest.mark.anyio +@patch("awslabs.ecs_mcp_server.api.security_analysis.ecs_api_operation") +async def test_discover_clusters(mock_api): + """Test cluster discovery function.""" + mock_api.return_value = { + "clusterArns": [ + "arn:aws:ecs:us-east-1:123:cluster/c1", + "arn:aws:ecs:us-east-1:123:cluster/c2", + ] + } + + result = await _discover_clusters("us-east-1") + + assert "clusters" in result + assert len(result["clusters"]) == 2 + assert result["clusters"][0] == "c1" + + +@pytest.mark.anyio +@patch("awslabs.ecs_mcp_server.api.security_analysis.ecs_api_operation") +@pytest.mark.parametrize( + "api_response", + [ + {"error": "AccessDenied"}, + Exception("Network error"), + ], +) +async def test_discover_clusters_errors(mock_api, api_response): + """Test cluster discovery error handling.""" + if isinstance(api_response, Exception): + mock_api.side_effect = api_response + else: + mock_api.return_value = api_response + + result = await _discover_clusters("us-east-1") + assert "error" in result + + +@pytest.mark.anyio +@patch("awslabs.ecs_mcp_server.api.security_analysis.ecs_api_operation") +async def test_default_region(mock_api, secure_cluster): + """Test default region is us-east-1.""" + mock_api.return_value = {"clusters": [secure_cluster]} + + result = await analyze_ecs_security(cluster_names=["test"], regions=None) + + assert result["results"][0]["region"] == "us-east-1"