Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions agent-framework/prometheus_swarm/utils/duplicate_evidence_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import logging
from typing import Any, Dict, List

class DuplicateEvidenceError(Exception):
"""Custom exception for handling duplicate evidence scenarios."""
pass

class DuplicateEvidenceHandler:
"""
A utility class to handle and log duplicate evidence with configurable logging.

This class provides methods to:
- Check for duplicate evidence
- Log duplicate evidence occurrences
- Optionally raise exceptions for duplicates
"""

def __init__(self, logger: logging.Logger = None, raise_on_duplicate: bool = False):
"""
Initialize the DuplicateEvidenceHandler.

Args:
logger (logging.Logger, optional): Custom logger. If not provided,
creates a default logger.
raise_on_duplicate (bool, optional): Whether to raise an exception
on duplicate evidence. Defaults to False.
"""
self.logger = logger or logging.getLogger(__name__)
self.raise_on_duplicate = raise_on_duplicate

def check_duplicates(self, evidence_list: List[Dict[str, Any]],
identifier_key: str = 'id') -> List[Dict[str, Any]]:
"""
Check for duplicate evidence in the given list.

Args:
evidence_list (List[Dict[str, Any]]): List of evidence dictionaries
identifier_key (str, optional): Key to use for identifying unique evidence.
Defaults to 'id'.

Returns:
List[Dict[str, Any]]: List of duplicate evidence entries

Raises:
DuplicateEvidenceError: If raise_on_duplicate is True and duplicates are found
"""
# Track seen identifiers and duplicates
seen_identifiers = set()
duplicates = []

for evidence in evidence_list:
identifier = evidence.get(identifier_key)

if identifier is None:
self.logger.warning(f"Evidence missing identifier key '{identifier_key}': {evidence}")
continue

if identifier in seen_identifiers:
# Log the duplicate
self.logger.warning(f"Duplicate evidence found: {evidence}")
duplicates.append(evidence)

# Optionally raise an exception
if self.raise_on_duplicate:
raise DuplicateEvidenceError(f"Duplicate evidence with {identifier_key}='{identifier}'")
else:
seen_identifiers.add(identifier)

return duplicates

def remove_duplicates(self, evidence_list: List[Dict[str, Any]],
identifier_key: str = 'id') -> List[Dict[str, Any]]:
"""
Remove duplicate evidence from the list, keeping first occurrence.

Args:
evidence_list (List[Dict[str, Any]]): List of evidence dictionaries
identifier_key (str, optional): Key to use for identifying unique evidence.
Defaults to 'id'.

Returns:
List[Dict[str, Any]]: List of evidence without duplicates
"""
seen_identifiers = set()
unique_evidence = []

for evidence in evidence_list:
identifier = evidence.get(identifier_key)

if identifier is None:
self.logger.warning(f"Evidence missing identifier key '{identifier_key}': {evidence}")
unique_evidence.append(evidence)
continue

if identifier not in seen_identifiers:
unique_evidence.append(evidence)
seen_identifiers.add(identifier)
else:
self.logger.info(f"Removing duplicate evidence: {evidence}")

return unique_evidence
81 changes: 81 additions & 0 deletions agent-framework/tests/unit/test_duplicate_evidence_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import pytest
import logging
from prometheus_swarm.utils.duplicate_evidence_handler import (
DuplicateEvidenceHandler,
DuplicateEvidenceError
)

def test_no_duplicates():
"""Test scenario with no duplicate evidence."""
handler = DuplicateEvidenceHandler()
evidence_list = [
{'id': 1, 'data': 'first'},
{'id': 2, 'data': 'second'}
]

duplicates = handler.check_duplicates(evidence_list)
assert len(duplicates) == 0

def test_with_duplicates():
"""Test scenario with duplicate evidence."""
handler = DuplicateEvidenceHandler()
evidence_list = [
{'id': 1, 'data': 'first'},
{'id': 1, 'data': 'duplicate'},
{'id': 2, 'data': 'third'}
]

duplicates = handler.check_duplicates(evidence_list)
assert len(duplicates) == 1
assert duplicates[0]['id'] == 1

def test_raise_on_duplicate():
"""Test raising an exception on duplicate evidence."""
handler = DuplicateEvidenceHandler(raise_on_duplicate=True)
evidence_list = [
{'id': 1, 'data': 'first'},
{'id': 1, 'data': 'duplicate'}
]

with pytest.raises(DuplicateEvidenceError):
handler.check_duplicates(evidence_list)

def test_remove_duplicates():
"""Test removing duplicate evidence."""
handler = DuplicateEvidenceHandler()
evidence_list = [
{'id': 1, 'data': 'first'},
{'id': 1, 'data': 'duplicate'},
{'id': 2, 'data': 'third'},
{'id': 2, 'data': 'another duplicate'}
]

unique_evidence = handler.remove_duplicates(evidence_list)
assert len(unique_evidence) == 2
unique_ids = [item['id'] for item in unique_evidence]
assert unique_ids == [1, 2]

def test_custom_identifier_key():
"""Test using a custom identifier key."""
handler = DuplicateEvidenceHandler()
evidence_list = [
{'uuid': 'abc', 'data': 'first'},
{'uuid': 'abc', 'data': 'duplicate'},
{'uuid': 'def', 'data': 'third'}
]

duplicates = handler.check_duplicates(evidence_list, identifier_key='uuid')
assert len(duplicates) == 1
assert duplicates[0]['uuid'] == 'abc'

def test_evidence_without_identifier():
"""Test handling evidence without an identifier."""
handler = DuplicateEvidenceHandler()
evidence_list = [
{'id': 1, 'data': 'first'},
{'data': 'no id'},
{'id': 1, 'data': 'duplicate'}
]

duplicates = handler.check_duplicates(evidence_list)
assert len(duplicates) == 1 # Only counts duplicates with valid identifiers