diff --git a/backend/engine/tests/test_engine_utils.py b/backend/engine/tests/test_engine_utils.py index 0330c5906..fbdb71664 100644 --- a/backend/engine/tests/test_engine_utils.py +++ b/backend/engine/tests/test_engine_utils.py @@ -2,7 +2,7 @@ import os from typing import Any import unittest -from unittest.mock import patch +from unittest.mock import patch, Mock import docker.errors from pydantic import ValidationError @@ -11,7 +11,9 @@ Runner, get_plugin_settings, match_nonallowlisted_raw_secrets, + process_event_info, temporary_volume, + get_truncated_hash, ) from utils.services import _get_services_from_file @@ -182,3 +184,83 @@ def test_temporary_volume_cleanup(self): raise ValueError("test") with self.assertRaises(docker.errors.NotFound): docker_client.volumes.get(vol_name) + + @patch("engine.utils.plugin.queue_event") + @patch("engine.utils.plugin.SECRETS_EVENTS_ENABLED", True) + @patch("engine.utils.plugin.PROCESS_SECRETS_WITH_PATH_EXCLUSIONS", True) + @patch("engine.utils.plugin.get_truncated_hash") + def test_process_event_info_secret_hash(self, mock_hash, mock_queue_event): + """ + Test that process_event_info generates a secret_hash when secret details exist, + and does not include secret_hash when no secret details are available. + """ + mock_hash.return_value = "abcd1234567890abcd1234" + + mock_scan = Mock() + mock_scan.repo.repo = "test-org/test-repo" + mock_scan.repo.service = "github" + mock_scan.ref = "main" + mock_scan.branch_last_commit_timestamp = "2025-12-01T00:00:00Z" + mock_scan.include_paths = None + mock_scan.exclude_paths = None + mock_scan.report_url = "https://example.com/report" + + # Test Result with secret details + results = { + "details": [ + { + "id": "secret-1", + "filename": "config.py", + "line": 42, + "commit": "abc123", + "author": "test-author", + "author-timestamp": "2025-12-01T00:00:00Z", + } + ], + "event_info": {"secret-1": {"type": "api-key", "match": ["sk-1234567890abcdef"]}}, + } + + process_event_info(mock_scan, results, "secrets", "test-plugin", False) + mock_hash.assert_called_once_with("sk-1234567890abcdef") + + # Verify that queue_event was called + self.assertTrue(mock_queue_event.called) + + # Verify that the payload includes the secret_hash + call_args = mock_queue_event.call_args[0] + payload = call_args[2] + self.assertIn("secret_hash", payload) + self.assertEqual(payload["secret_hash"], "abcd1234567890abcd1234") + + # Reset mocks + mock_queue_event.reset_mock() + mock_hash.reset_mock() + + # Modify test result + results["event_info"]["secret-1"]["match"] = [""] + + process_event_info(mock_scan, results, "secrets", "test-plugin", False) + mock_hash.assert_called_once_with("") + + # Verify that the payload does NOT include secret_hash + call_args = mock_queue_event.call_args[0] + payload = call_args[2] + self.assertNotIn("secret_hash", payload) + + @patch("engine.utils.plugin.AWSConnect") + def test_get_truncated_hash_with_pepper(self, mock_aws_connect): + """ + Test that get_truncated_hash uses the pepper from AWS secrets to generate hash. + """ + mock_aws_instance = Mock() + mock_aws_instance.get_secret_raw.return_value = ( + "11f450d9c976c012eeaac9eb8047ef5ad1963c12f8b928c6392d1306b9cf5796" + ) + mock_aws_connect.return_value = mock_aws_instance + + test_value = "test-secret-value" + result = get_truncated_hash(test_value) + + # Verify the same input produces the same output + result2 = get_truncated_hash(test_value) + self.assertEqual(result, result2) diff --git a/backend/engine/utils/plugin.py b/backend/engine/utils/plugin.py index 24b200289..b4642485c 100644 --- a/backend/engine/utils/plugin.py +++ b/backend/engine/utils/plugin.py @@ -11,6 +11,7 @@ import uuid import boto3 +import hashlib from botocore.exceptions import ClientError from django.db.models import Q from django.db import transaction @@ -19,6 +20,7 @@ from pydantic import BaseModel, Field, field_validator from artemisdb.artemisdb.models import PluginConfig, SecretType, PluginType, Scan +from artemislib.aws import AWSConnect from artemislib.github.app import GITHUB_APP_ID from artemislib.logging import Logger, LOG_LEVEL, inject_plugin_logs from artemislib.util import dict_eq @@ -499,6 +501,13 @@ def process_event_info(scan: Scan, results, plugin_type: str, plugin_name: str, org = scan.repo.repo repository = scan.repo.repo + secret_details = results["event_info"][item["id"]]["match"] + if isinstance(secret_details, list) and secret_details: + secret_details = str(secret_details[0]) + else: + secret_details = "" + secret_hash = get_truncated_hash(secret_details) + payload = { "timestamp": timestamp, "type": plugin_type, @@ -530,6 +539,10 @@ def process_event_info(scan: Scan, results, plugin_type: str, plugin_name: str, f"&st_resource={results['event_info'][item['id']]['type']}" # Filter on secrets type ), } + + # Add the secret_hash field if the secret details are available + if secret_details: + payload["secret_hash"] = secret_hash queue_event(scan.repo.repo, plugin_type, payload) elif plugin_type in PluginType.INVENTORY.value and INVENTORY_EVENTS_ENABLED: payload = { @@ -587,6 +600,17 @@ def queue_event(repo: str, plugin_type: str, payload: dict): log.error("Unable to queue %s event for %s", plugin_type, repo) +def get_truncated_hash(value: str, chars=24) -> str: + aws = AWSConnect() + pepper = aws.get_secret_raw(f"{APPLICATION}/pepper") + pepper = bytes.fromhex(pepper) + hash = hashlib.new("sha3_256") + hash.update(pepper) + hash.update(value.encode()) + + return hash.hexdigest()[:chars] + + def get_secret_raw_wl(scan): # Note: scan type is unspecified until we enable typechecking Django models. """ diff --git a/backend/terraform/modules/analyzer/secrets.tf b/backend/terraform/modules/analyzer/secrets.tf index 9f20039fd..e1613c188 100644 --- a/backend/terraform/modules/analyzer/secrets.tf +++ b/backend/terraform/modules/analyzer/secrets.tf @@ -354,6 +354,17 @@ resource "aws_secretsmanager_secret_version" "private_docker_repo_creds" { ]) } +resource "aws_secretsmanager_secret" "pepper" { + name = "${var.app}/pepper" + description = "Artemis Pepper for deduping secret results" +} +resource "aws_secretsmanager_secret_version" "artemis-pepper" { + secret_id = aws_secretsmanager_secret.pepper.id + secret_string = jsonencode({ + "key" : "REPLACEVALUE", + }) +} + ############################################################################### # Datadog ###############################################################################