From eddc0b52dd25880b3a57f971a48d856aa1cd8369 Mon Sep 17 00:00:00 2001 From: Dewin2309 Date: Tue, 6 May 2025 21:16:46 +0000 Subject: [PATCH 1/2] Add utility for detecting and logging duplicate evidence --- .../utils/duplicate_evidence.py | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 agent-framework/prometheus_swarm/utils/duplicate_evidence.py diff --git a/agent-framework/prometheus_swarm/utils/duplicate_evidence.py b/agent-framework/prometheus_swarm/utils/duplicate_evidence.py new file mode 100644 index 00000000..65cc989a --- /dev/null +++ b/agent-framework/prometheus_swarm/utils/duplicate_evidence.py @@ -0,0 +1,60 @@ +import logging +from typing import List, Dict, Any, Optional + +class DuplicateEvidenceError(Exception): + """Custom exception for duplicate evidence detection.""" + pass + +def detect_duplicate_evidence(evidence_list: List[Dict[str, Any]], + identifier_key: str = 'id', + logger: Optional[logging.Logger] = None) -> List[Dict[str, Any]]: + """ + Detect and handle duplicate evidence in a list of evidence dictionaries. + + Args: + evidence_list (List[Dict[str, Any]]): List of evidence dictionaries + identifier_key (str, optional): Key used to identify unique evidence. Defaults to 'id'. + logger (Optional[logging.Logger], optional): Logger for recording duplicate evidence. + If not provided, will use root logger. + + Returns: + List[Dict[str, Any]]: List of unique evidence + + Raises: + DuplicateEvidenceError: If duplicate evidence is detected and cannot be resolved + """ + if not logger: + logger = logging.getLogger(__name__) + + # Track unique evidence and duplicate entries + unique_evidence = [] + duplicate_evidence = [] + + # Track seen identifiers to detect duplicates + seen_identifiers = set() + + for evidence in evidence_list: + identifier = evidence.get(identifier_key) + + if identifier is None: + logger.warning(f"Evidence missing identifier key '{identifier_key}': {evidence}") + unique_evidence.append(evidence) + continue + + if identifier in seen_identifiers: + duplicate_evidence.append(evidence) + logger.warning(f"Duplicate evidence detected for {identifier_key}: {identifier}") + else: + seen_identifiers.add(identifier) + unique_evidence.append(evidence) + + if duplicate_evidence: + # Log total number of duplicates + logger.error(f"Total duplicate evidence: {len(duplicate_evidence)}") + + # Optional: Configure behavior for handling duplicates + # Current implementation keeps first occurrence and logs others + if len(duplicate_evidence) > 0: + raise DuplicateEvidenceError(f"Found {len(duplicate_evidence)} duplicate evidence entries") + + return unique_evidence \ No newline at end of file From 5ed84c6061ff6f334c71cff37cf0e77d2a60b819 Mon Sep 17 00:00:00 2001 From: Dewin2309 Date: Tue, 6 May 2025 21:16:58 +0000 Subject: [PATCH 2/2] Add tests for duplicate evidence detection utility --- .../tests/unit/test_duplicate_evidence.py | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 agent-framework/tests/unit/test_duplicate_evidence.py diff --git a/agent-framework/tests/unit/test_duplicate_evidence.py b/agent-framework/tests/unit/test_duplicate_evidence.py new file mode 100644 index 00000000..af5c7239 --- /dev/null +++ b/agent-framework/tests/unit/test_duplicate_evidence.py @@ -0,0 +1,54 @@ +import pytest +import logging +from prometheus_swarm.utils.duplicate_evidence import detect_duplicate_evidence, DuplicateEvidenceError + +def test_no_duplicate_evidence(): + """Test that no duplicate evidence passes without issues.""" + evidence_list = [ + {'id': 1, 'data': 'first entry'}, + {'id': 2, 'data': 'second entry'} + ] + result = detect_duplicate_evidence(evidence_list) + assert len(result) == 2 + +def test_duplicate_evidence_raises_error(): + """Test that duplicate evidence raises an error.""" + evidence_list = [ + {'id': 1, 'data': 'first entry'}, + {'id': 1, 'data': 'duplicate entry'} + ] + with pytest.raises(DuplicateEvidenceError): + detect_duplicate_evidence(evidence_list) + +def test_handles_missing_identifier(): + """Test handling of evidence without an identifier.""" + evidence_list = [ + {'id': 1, 'data': 'first entry'}, + {'data': 'entry without id'} + ] + result = detect_duplicate_evidence(evidence_list) + assert len(result) == 2 + +def test_custom_identifier_key(): + """Test using a custom identifier key.""" + evidence_list = [ + {'custom_id': 'A', 'data': 'first entry'}, + {'custom_id': 'B', 'data': 'second entry'}, + {'custom_id': 'A', 'data': 'duplicate entry'} + ] + with pytest.raises(DuplicateEvidenceError): + detect_duplicate_evidence(evidence_list, identifier_key='custom_id') + +def test_logging_duplicate_evidence(caplog): + """Test that duplicate evidence is logged.""" + caplog.set_level(logging.WARNING) + + evidence_list = [ + {'id': 1, 'data': 'first entry'}, + {'id': 1, 'data': 'duplicate entry'} + ] + + with pytest.raises(DuplicateEvidenceError): + detect_duplicate_evidence(evidence_list) + + assert "Duplicate evidence detected" in caplog.text \ No newline at end of file