diff --git a/agent-framework/prometheus_swarm/utils/duplicate_evidence.py b/agent-framework/prometheus_swarm/utils/duplicate_evidence.py new file mode 100644 index 00000000..fdd3f843 --- /dev/null +++ b/agent-framework/prometheus_swarm/utils/duplicate_evidence.py @@ -0,0 +1,75 @@ +import logging +from typing import Any, List, Dict + +class DuplicateEvidenceError(Exception): + """Custom exception for duplicate evidence scenarios.""" + pass + +def check_duplicate_evidence(evidence_list: List[Dict[str, Any]], + unique_key: str = 'id') -> List[Dict[str, Any]]: + """ + Check for duplicate evidence based on a unique key. + + Args: + evidence_list (List[Dict[str, Any]]): List of evidence dictionaries + unique_key (str, optional): Key to use for identifying duplicates. Defaults to 'id'. + + Returns: + List[Dict[str, Any]]: List of unique evidence entries + + Raises: + DuplicateEvidenceError: If duplicate evidence is detected and cannot be resolved + """ + # Configure logging + logger = logging.getLogger(__name__) + logger.setLevel(logging.INFO) + + # Check if logging handler exists, if not add a stream handler + if not logger.handlers: + handler = logging.StreamHandler() + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + logger.addHandler(handler) + + # Track unique and duplicate evidence + unique_evidence = [] + duplicate_evidence = [] + seen_keys = set() + + for evidence in evidence_list: + # Check if the unique key exists + if unique_key not in evidence: + logger.warning(f"Evidence missing unique key '{unique_key}': {evidence}") + unique_evidence.append(evidence) + continue + + unique_value = evidence[unique_key] + + # Check for duplicates + if unique_value in seen_keys: + duplicate_evidence.append(evidence) + logger.warning(f"Duplicate evidence detected: {evidence}") + else: + seen_keys.add(unique_value) + unique_evidence.append(evidence) + + # If duplicates are found, raise an error or log a warning based on severity + if duplicate_evidence: + logger.error(f"Found {len(duplicate_evidence)} duplicate evidence entries") + raise DuplicateEvidenceError(f"Duplicate evidence detected: {duplicate_evidence}") + + return unique_evidence + +def log_duplicate_evidence(duplicate_entries: List[Dict[str, Any]], + log_level: int = logging.WARNING) -> None: + """ + Log duplicate evidence entries with specified log level. + + Args: + duplicate_entries (List[Dict[str, Any]]): List of duplicate evidence entries + log_level (int, optional): Logging level. Defaults to logging.WARNING. + """ + logger = logging.getLogger(__name__) + + for entry in duplicate_entries: + logger.log(log_level, f"Duplicate Evidence: {entry}") \ No newline at end of file diff --git a/agent-framework/tests/unit/test_duplicate_evidence.py b/agent-framework/tests/unit/test_duplicate_evidence.py new file mode 100644 index 00000000..67e7477b --- /dev/null +++ b/agent-framework/tests/unit/test_duplicate_evidence.py @@ -0,0 +1,74 @@ +import pytest +import logging +from prometheus_swarm.utils.duplicate_evidence import ( + check_duplicate_evidence, + DuplicateEvidenceError, + log_duplicate_evidence +) + +def test_check_duplicate_evidence_no_duplicates(): + """Test checking evidence with no duplicates.""" + evidence_list = [ + {'id': 1, 'data': 'first'}, + {'id': 2, 'data': 'second'}, + {'id': 3, 'data': 'third'} + ] + result = check_duplicate_evidence(evidence_list) + assert len(result) == 3 + +def test_check_duplicate_evidence_with_duplicates(): + """Test checking evidence with duplicates.""" + evidence_list = [ + {'id': 1, 'data': 'first'}, + {'id': 2, 'data': 'second'}, + {'id': 1, 'data': 'duplicate'} + ] + with pytest.raises(DuplicateEvidenceError): + check_duplicate_evidence(evidence_list) + +def test_check_duplicate_evidence_custom_unique_key(): + """Test checking evidence with a custom unique key.""" + evidence_list = [ + {'uuid': 'a1', 'data': 'first'}, + {'uuid': 'b2', 'data': 'second'}, + {'uuid': 'a1', 'data': 'duplicate'} + ] + with pytest.raises(DuplicateEvidenceError): + check_duplicate_evidence(evidence_list, unique_key='uuid') + +def test_check_duplicate_evidence_missing_key(): + """Test handling evidence with missing unique key.""" + evidence_list = [ + {'id': 1, 'data': 'first'}, + {'data': 'no id'}, + {'id': 2, 'data': 'second'} + ] + result = check_duplicate_evidence(evidence_list) + assert len(result) == 3 + +def test_log_duplicate_evidence(caplog): + """Test logging of duplicate evidence.""" + caplog.set_level(logging.WARNING) + duplicate_entries = [ + {'id': 1, 'data': 'duplicate 1'}, + {'id': 2, 'data': 'duplicate 2'} + ] + + log_duplicate_evidence(duplicate_entries) + + assert len(caplog.records) == 2 + assert all('Duplicate Evidence' in record.message for record in caplog.records) + assert all(record.levelno == logging.WARNING for record in caplog.records) + +def test_log_duplicate_evidence_custom_log_level(caplog): + """Test logging of duplicate evidence with custom log level.""" + caplog.set_level(logging.ERROR) + duplicate_entries = [ + {'id': 1, 'data': 'duplicate 1'} + ] + + log_duplicate_evidence(duplicate_entries, log_level=logging.ERROR) + + assert len(caplog.records) == 1 + assert caplog.records[0].levelno == logging.ERROR + assert 'Duplicate Evidence' in caplog.records[0].message \ No newline at end of file