Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions agent-framework/prometheus_swarm/utils/duplicate_evidence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import logging
from typing import List, Dict, Any, Optional

class DuplicateEvidenceError(Exception):
"""Custom exception for duplicate evidence detection."""
pass

def detect_duplicate_evidence(evidence_list: List[Dict[str, Any]],
identifier_key: str = 'id',
logger: Optional[logging.Logger] = None) -> List[Dict[str, Any]]:
"""
Detect and handle duplicate evidence in a list of evidence dictionaries.

Args:
evidence_list (List[Dict[str, Any]]): List of evidence dictionaries
identifier_key (str, optional): Key used to identify unique evidence. Defaults to 'id'.
logger (Optional[logging.Logger], optional): Logger for recording duplicate evidence.
If not provided, will use root logger.

Returns:
List[Dict[str, Any]]: List of unique evidence

Raises:
DuplicateEvidenceError: If duplicate evidence is detected and cannot be resolved
"""
if not logger:
logger = logging.getLogger(__name__)

# Track unique evidence and duplicate entries
unique_evidence = []
duplicate_evidence = []

# Track seen identifiers to detect duplicates
seen_identifiers = set()

for evidence in evidence_list:
identifier = evidence.get(identifier_key)

if identifier is None:
logger.warning(f"Evidence missing identifier key '{identifier_key}': {evidence}")
unique_evidence.append(evidence)
continue

if identifier in seen_identifiers:
duplicate_evidence.append(evidence)
logger.warning(f"Duplicate evidence detected for {identifier_key}: {identifier}")
else:
seen_identifiers.add(identifier)
unique_evidence.append(evidence)

if duplicate_evidence:
# Log total number of duplicates
logger.error(f"Total duplicate evidence: {len(duplicate_evidence)}")

# Optional: Configure behavior for handling duplicates
# Current implementation keeps first occurrence and logs others
if len(duplicate_evidence) > 0:
raise DuplicateEvidenceError(f"Found {len(duplicate_evidence)} duplicate evidence entries")

return unique_evidence
54 changes: 54 additions & 0 deletions agent-framework/tests/unit/test_duplicate_evidence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import pytest
import logging
from prometheus_swarm.utils.duplicate_evidence import detect_duplicate_evidence, DuplicateEvidenceError

def test_no_duplicate_evidence():
"""Test that no duplicate evidence passes without issues."""
evidence_list = [
{'id': 1, 'data': 'first entry'},
{'id': 2, 'data': 'second entry'}
]
result = detect_duplicate_evidence(evidence_list)
assert len(result) == 2

def test_duplicate_evidence_raises_error():
"""Test that duplicate evidence raises an error."""
evidence_list = [
{'id': 1, 'data': 'first entry'},
{'id': 1, 'data': 'duplicate entry'}
]
with pytest.raises(DuplicateEvidenceError):
detect_duplicate_evidence(evidence_list)

def test_handles_missing_identifier():
"""Test handling of evidence without an identifier."""
evidence_list = [
{'id': 1, 'data': 'first entry'},
{'data': 'entry without id'}
]
result = detect_duplicate_evidence(evidence_list)
assert len(result) == 2

def test_custom_identifier_key():
"""Test using a custom identifier key."""
evidence_list = [
{'custom_id': 'A', 'data': 'first entry'},
{'custom_id': 'B', 'data': 'second entry'},
{'custom_id': 'A', 'data': 'duplicate entry'}
]
with pytest.raises(DuplicateEvidenceError):
detect_duplicate_evidence(evidence_list, identifier_key='custom_id')

def test_logging_duplicate_evidence(caplog):
"""Test that duplicate evidence is logged."""
caplog.set_level(logging.WARNING)

evidence_list = [
{'id': 1, 'data': 'first entry'},
{'id': 1, 'data': 'duplicate entry'}
]

with pytest.raises(DuplicateEvidenceError):
detect_duplicate_evidence(evidence_list)

assert "Duplicate evidence detected" in caplog.text