Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions agent-framework/prometheus_swarm/utils/duplicate_evidence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import logging
from typing import Any, List, Dict, Optional

logger = logging.getLogger(__name__)

class DuplicateEvidenceError(Exception):
"""Custom exception for handling duplicate evidence scenarios."""
pass

def detect_duplicate_evidence(evidence_list: List[Dict[str, Any]],
unique_keys: Optional[List[str]] = None,
allow_duplicates: bool = False) -> List[Dict[str, Any]]:
"""
Detect and handle duplicate evidence in a list of evidence items.

Args:
evidence_list (List[Dict[str, Any]]): List of evidence items to check
unique_keys (Optional[List[str]], optional): Keys to use for uniqueness check.
Defaults to checking all keys.
allow_duplicates (bool, optional): If True, do not raise error on duplicates. Defaults to False.

Returns:
List[Dict[str, Any]]: List of unique evidence items

Raises:
DuplicateEvidenceError: If duplicates are found and cannot be resolved
"""
if not evidence_list:
logger.info("Empty evidence list provided")
return []

# If no unique keys specified, use all keys
if unique_keys is None:
unique_keys = list(evidence_list[0].keys())

# Track unique evidence and duplicates
unique_evidence = []
duplicate_evidence = []

for evidence in evidence_list:
# Check if current evidence is a duplicate
is_duplicate = any(
all(evidence.get(key) == existing.get(key) for key in unique_keys)
for existing in unique_evidence
)

if is_duplicate:
duplicate_evidence.append(evidence)
logger.warning(f"Duplicate evidence detected: {evidence}")
else:
unique_evidence.append(evidence)

# Log duplicate summary
if duplicate_evidence:
logger.info(f"Found {len(duplicate_evidence)} duplicate evidence items")

# Optional: Raise an error if duplicates are critical
if not allow_duplicates and len(duplicate_evidence) > 0:
raise DuplicateEvidenceError(f"Found {len(duplicate_evidence)} duplicate evidence items")

return unique_evidence if not allow_duplicates else evidence_list

def log_duplicate_evidence(evidence_list: List[Dict[str, Any]],
log_level: int = logging.WARNING,
unique_keys: Optional[List[str]] = None) -> None:
"""
Log information about duplicate evidence without removing them.

Args:
evidence_list (List[Dict[str, Any]]): List of evidence items to log
log_level (int, optional): Logging level. Defaults to logging.WARNING.
unique_keys (Optional[List[str]], optional): Keys to use for duplicate detection.
"""
if not unique_keys:
# Use all keys if not specified
unique_keys = list(evidence_list[0].keys()) if evidence_list else []

# Track unique and duplicate evidence
unique_evidence = set()
duplicates = []

for evidence in evidence_list:
# Create a tuple of values for keys used to identify uniqueness
unique_tuple = tuple(str(evidence.get(key, '')) for key in unique_keys)

if unique_tuple in unique_evidence:
duplicates.append(evidence)
else:
unique_evidence.add(unique_tuple)

# Log duplicate details
if duplicates:
logger.log(log_level, f"Total duplicate evidence found: {len(duplicates)}")
for dup in duplicates:
logger.log(log_level, f"Duplicate Evidence: {dup}")
74 changes: 74 additions & 0 deletions agent-framework/tests/unit/test_duplicate_evidence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import pytest
import logging
from prometheus_swarm.utils.duplicate_evidence import (
detect_duplicate_evidence,
log_duplicate_evidence,
DuplicateEvidenceError
)

def test_detect_no_duplicates():
evidence = [
{"id": 1, "name": "Item1"},
{"id": 2, "name": "Item2"}
]
result = detect_duplicate_evidence(evidence)
assert len(result) == 2

def test_detect_duplicates_with_custom_keys():
evidence = [
{"id": 1, "name": "Item1", "extra": "data1"},
{"id": 1, "name": "Item1", "extra": "data2"}
]
result = detect_duplicate_evidence(evidence, unique_keys=["id", "name"], allow_duplicates=True)
assert len(result) == 2

def test_detect_duplicates_raises_error():
evidence = [
{"id": 1, "name": "Item1"},
{"id": 1, "name": "Item1"}
]
with pytest.raises(DuplicateEvidenceError):
detect_duplicate_evidence(evidence)

def test_detect_duplicates_allowed():
evidence = [
{"id": 1, "name": "Item1"},
{"id": 1, "name": "Item1"}
]
result = detect_duplicate_evidence(evidence, allow_duplicates=True)
assert len(result) == 2

def test_detect_empty_list():
result = detect_duplicate_evidence([])
assert len(result) == 0

def test_log_duplicate_evidence(caplog):
caplog.set_level(logging.WARNING)
evidence = [
{"id": 1, "name": "Item1"},
{"id": 1, "name": "Item1"}
]

log_duplicate_evidence(evidence)

assert "Total duplicate evidence found: 1" in caplog.text
assert "Duplicate Evidence:" in caplog.text

def test_custom_unique_key_matching():
evidence = [
{"id": 1, "category": "alpha", "value": 100},
{"id": 2, "category": "alpha", "value": 200},
{"id": 3, "category": "beta", "value": 300}
]

result = detect_duplicate_evidence(evidence, unique_keys=["category"], allow_duplicates=True)
assert len(result) == 3

def test_complex_object_duplicates():
evidence = [
{"id": 1, "details": {"type": "A", "value": 10}},
{"id": 1, "details": {"type": "A", "value": 10}}
]

result = detect_duplicate_evidence(evidence, allow_duplicates=True)
assert len(result) == 2