Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions agent-framework/prometheus_swarm/utils/duplicate_evidence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import logging
from typing import Any, List, Dict

class DuplicateEvidenceError(Exception):
"""Custom exception for duplicate evidence scenarios."""
pass

def check_duplicate_evidence(evidence_list: List[Dict[str, Any]],
unique_key: str = 'id') -> List[Dict[str, Any]]:
"""
Check for duplicate evidence based on a unique key.

Args:
evidence_list (List[Dict[str, Any]]): List of evidence dictionaries
unique_key (str, optional): Key to use for identifying duplicates. Defaults to 'id'.

Returns:
List[Dict[str, Any]]: List of unique evidence entries

Raises:
DuplicateEvidenceError: If duplicate evidence is detected and cannot be resolved
"""
# Configure logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# Check if logging handler exists, if not add a stream handler
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

# Track unique and duplicate evidence
unique_evidence = []
duplicate_evidence = []
seen_keys = set()

for evidence in evidence_list:
# Check if the unique key exists
if unique_key not in evidence:
logger.warning(f"Evidence missing unique key '{unique_key}': {evidence}")
unique_evidence.append(evidence)
continue

unique_value = evidence[unique_key]

# Check for duplicates
if unique_value in seen_keys:
duplicate_evidence.append(evidence)
logger.warning(f"Duplicate evidence detected: {evidence}")
else:
seen_keys.add(unique_value)
unique_evidence.append(evidence)

# If duplicates are found, raise an error or log a warning based on severity
if duplicate_evidence:
logger.error(f"Found {len(duplicate_evidence)} duplicate evidence entries")
raise DuplicateEvidenceError(f"Duplicate evidence detected: {duplicate_evidence}")

return unique_evidence

def log_duplicate_evidence(duplicate_entries: List[Dict[str, Any]],
log_level: int = logging.WARNING) -> None:
"""
Log duplicate evidence entries with specified log level.

Args:
duplicate_entries (List[Dict[str, Any]]): List of duplicate evidence entries
log_level (int, optional): Logging level. Defaults to logging.WARNING.
"""
logger = logging.getLogger(__name__)

for entry in duplicate_entries:
logger.log(log_level, f"Duplicate Evidence: {entry}")
74 changes: 74 additions & 0 deletions agent-framework/tests/unit/test_duplicate_evidence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import pytest
import logging
from prometheus_swarm.utils.duplicate_evidence import (
check_duplicate_evidence,
DuplicateEvidenceError,
log_duplicate_evidence
)

def test_check_duplicate_evidence_no_duplicates():
"""Test checking evidence with no duplicates."""
evidence_list = [
{'id': 1, 'data': 'first'},
{'id': 2, 'data': 'second'},
{'id': 3, 'data': 'third'}
]
result = check_duplicate_evidence(evidence_list)
assert len(result) == 3

def test_check_duplicate_evidence_with_duplicates():
"""Test checking evidence with duplicates."""
evidence_list = [
{'id': 1, 'data': 'first'},
{'id': 2, 'data': 'second'},
{'id': 1, 'data': 'duplicate'}
]
with pytest.raises(DuplicateEvidenceError):
check_duplicate_evidence(evidence_list)

def test_check_duplicate_evidence_custom_unique_key():
"""Test checking evidence with a custom unique key."""
evidence_list = [
{'uuid': 'a1', 'data': 'first'},
{'uuid': 'b2', 'data': 'second'},
{'uuid': 'a1', 'data': 'duplicate'}
]
with pytest.raises(DuplicateEvidenceError):
check_duplicate_evidence(evidence_list, unique_key='uuid')

def test_check_duplicate_evidence_missing_key():
"""Test handling evidence with missing unique key."""
evidence_list = [
{'id': 1, 'data': 'first'},
{'data': 'no id'},
{'id': 2, 'data': 'second'}
]
result = check_duplicate_evidence(evidence_list)
assert len(result) == 3

def test_log_duplicate_evidence(caplog):
"""Test logging of duplicate evidence."""
caplog.set_level(logging.WARNING)
duplicate_entries = [
{'id': 1, 'data': 'duplicate 1'},
{'id': 2, 'data': 'duplicate 2'}
]

log_duplicate_evidence(duplicate_entries)

assert len(caplog.records) == 2
assert all('Duplicate Evidence' in record.message for record in caplog.records)
assert all(record.levelno == logging.WARNING for record in caplog.records)

def test_log_duplicate_evidence_custom_log_level(caplog):
"""Test logging of duplicate evidence with custom log level."""
caplog.set_level(logging.ERROR)
duplicate_entries = [
{'id': 1, 'data': 'duplicate 1'}
]

log_duplicate_evidence(duplicate_entries, log_level=logging.ERROR)

assert len(caplog.records) == 1
assert caplog.records[0].levelno == logging.ERROR
assert 'Duplicate Evidence' in caplog.records[0].message