Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 67 additions & 1 deletion backend/engine/tests/test_engine_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
from typing import Any
import unittest
from unittest.mock import patch
from unittest.mock import patch, Mock
import docker.errors
from pydantic import ValidationError

Expand All @@ -11,7 +11,9 @@
Runner,
get_plugin_settings,
match_nonallowlisted_raw_secrets,
process_event_info,
temporary_volume,
get_truncated_hash,
)
from utils.services import _get_services_from_file

Expand Down Expand Up @@ -182,3 +184,67 @@ def test_temporary_volume_cleanup(self):
raise ValueError("test")
with self.assertRaises(docker.errors.NotFound):
docker_client.volumes.get(vol_name)

@patch("engine.utils.plugin.queue_event")
@patch("engine.utils.plugin.SECRETS_EVENTS_ENABLED", True)
@patch("engine.utils.plugin.PROCESS_SECRETS_WITH_PATH_EXCLUSIONS", True)
@patch("engine.utils.plugin.get_truncated_hash")
def test_process_event_info_secret_hash(self, mock_hash, mock_queue_event):
"""
Test that process_event_info generates a secret_hash and passes it to queue_event.
"""
mock_hash.return_value = "abcd1234567890abcd1234"

mock_scan = Mock()
mock_scan.repo.repo = "test-org/test-repo"
mock_scan.repo.service = "github"
mock_scan.ref = "main"
mock_scan.branch_last_commit_timestamp = "2025-12-01T00:00:00Z"
mock_scan.include_paths = None
mock_scan.exclude_paths = None
mock_scan.report_url = "https://example.com/report"

# Dummy output from Secret scanner
results = {
"details": [
{
"id": "secret-1",
"filename": "config.py",
"line": 42,
"commit": "abc123",
"author": "test-author",
"author-timestamp": "2025-12-01T00:00:00Z",
}
],
"event_info": {"secret-1": {"type": "api-key", "match": ["sk-1234567890abcdef"]}},
}

process_event_info(mock_scan, results, "secrets", "test-plugin", False)
mock_hash.assert_called_once_with("sk-1234567890abcdef")

# Verify that queue_event was called
self.assertTrue(mock_queue_event.called)

# Verify that the payload includes the secret_hash
call_args = mock_queue_event.call_args[0]
payload = call_args[2]
self.assertIn("secret_hash", payload)
self.assertEqual(payload["secret_hash"], "abcd1234567890abcd1234")

@patch("engine.utils.plugin.AWSConnect")
def test_get_truncated_hash_with_pepper(self, mock_aws_connect):
"""
Test that get_truncated_hash uses the pepper from AWS secrets to generate hash.
"""
mock_aws_instance = Mock()
mock_aws_instance.get_secret_raw.return_value = (
"11f450d9c976c012eeaac9eb8047ef5ad1963c12f8b928c6392d1306b9cf5796"
)
mock_aws_connect.return_value = mock_aws_instance

test_value = "test-secret-value"
result = get_truncated_hash(test_value)

# Verify the same input produces the same output
result2 = get_truncated_hash(test_value)
self.assertEqual(result, result2)
21 changes: 21 additions & 0 deletions backend/engine/utils/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import uuid

import boto3
import hashlib
from botocore.exceptions import ClientError
from django.db.models import Q
from django.db import transaction
Expand All @@ -19,6 +20,7 @@
from pydantic import BaseModel, Field, field_validator

from artemisdb.artemisdb.models import PluginConfig, SecretType, PluginType, Scan
from artemislib.aws import AWSConnect
from artemislib.github.app import GITHUB_APP_ID
from artemislib.logging import Logger, LOG_LEVEL, inject_plugin_logs
from artemislib.util import dict_eq
Expand Down Expand Up @@ -499,6 +501,13 @@ def process_event_info(scan: Scan, results, plugin_type: str, plugin_name: str,
org = scan.repo.repo
repository = scan.repo.repo

secret_details = results["event_info"][item["id"]]["match"]
if isinstance(secret_details, list) and secret_details:
secret_details = str(secret_details[0])
else:
secret_details = ""
secret_hash = get_truncated_hash(secret_details)

payload = {
"timestamp": timestamp,
"type": plugin_type,
Expand All @@ -520,6 +529,7 @@ def process_event_info(scan: Scan, results, plugin_type: str, plugin_name: str,
"last_commit": scan.branch_last_commit_timestamp,
"state": item.get("state", "open"),
"validity": item.get("validity", "unknown"),
"secret_hash": secret_hash,
"secret_type": results["event_info"][item["id"]]["type"],
"secret_type_display_name": results["event_info"][item["id"]]["type"],
"report_url": (
Expand Down Expand Up @@ -587,6 +597,17 @@ def queue_event(repo: str, plugin_type: str, payload: dict):
log.error("Unable to queue %s event for %s", plugin_type, repo)


def get_truncated_hash(value: str, chars=24) -> str:
aws = AWSConnect()
pepper = aws.get_secret_raw(f"{APPLICATION}/pepper")
pepper = bytes.fromhex(pepper)
hash = hashlib.new("sha3_256")
hash.update(pepper)
hash.update(value.encode())

return hash.hexdigest()[:chars]


def get_secret_raw_wl(scan):
# Note: scan type is unspecified until we enable typechecking Django models.
"""
Expand Down
11 changes: 11 additions & 0 deletions backend/terraform/modules/analyzer/secrets.tf
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,17 @@ resource "aws_secretsmanager_secret_version" "private_docker_repo_creds" {
])
}

resource "aws_secretsmanager_secret" "pepper" {
name = "${var.app}/pepper"
description = "Artemis Pepper for deduping secret results"
}
resource "aws_secretsmanager_secret_version" "artemis-pepper" {
secret_id = aws_secretsmanager_secret.pepper.id
secret_string = jsonencode({
"key" : "REPLACEVALUE",
})
}

###############################################################################
# Datadog
###############################################################################
Expand Down