diff --git a/agent-framework/prometheus_swarm/utils/duplicate_evidence.py b/agent-framework/prometheus_swarm/utils/duplicate_evidence.py new file mode 100644 index 00000000..87986aae --- /dev/null +++ b/agent-framework/prometheus_swarm/utils/duplicate_evidence.py @@ -0,0 +1,111 @@ +import logging +from typing import List, Dict, Any, Optional + +class DuplicateEvidenceError(Exception): + """Custom exception for duplicate evidence scenarios.""" + pass + +def validate_unique_evidence(evidence_list: List[Dict[Any, Any]], + unique_key: str = 'id') -> None: + """ + Validate that evidence entries are unique based on a specified key. + + Args: + evidence_list (List[Dict[Any, Any]]): List of evidence dictionaries + unique_key (str, optional): Key used to determine uniqueness. Defaults to 'id'. + + Raises: + DuplicateEvidenceError: If duplicate evidence is detected + """ + logger = logging.getLogger(__name__) + + # Check for duplicates + seen_keys = set() + duplicates = [] + + for item in evidence_list: + if unique_key not in item: + logger.warning(f"Evidence item missing unique key '{unique_key}': {item}") + continue + + current_key = item[unique_key] + + if current_key in seen_keys: + duplicates.append(current_key) + logger.error(f"Duplicate evidence found with {unique_key}: {current_key}") + + seen_keys.add(current_key) + + if duplicates: + raise DuplicateEvidenceError( + f"Duplicate evidence detected for {unique_key}s: {duplicates}" + ) + +def log_evidence_summary(evidence_list: List[Dict[Any, Any]], + log_level: str = 'INFO') -> None: + """ + Log a summary of evidence entries with configurable log level. + + Args: + evidence_list (List[Dict[Any, Any]]): List of evidence dictionaries + log_level (str, optional): Logging level. Defaults to 'INFO'. + """ + logger = logging.getLogger(__name__) + log_method = getattr(logger, log_level.lower(), logger.info) + + log_method(f"Total evidence entries: {len(evidence_list)}") + log_method(f"Evidence keys: {list(evidence_list[0].keys()) if evidence_list else 'N/A'}") + +def filter_duplicates(evidence_list: List[Dict[Any, Any]], + unique_key: str = 'id', + keep: str = 'first') -> List[Dict[Any, Any]]: + """ + Filter out duplicate evidence entries while preserving desired entries. + + Args: + evidence_list (List[Dict[Any, Any]]): List of evidence dictionaries + unique_key (str, optional): Key used to determine uniqueness. Defaults to 'id'. + keep (str, optional): Strategy for keeping duplicates. + 'first' keeps first occurrence, 'last' keeps last. + Defaults to 'first'. + + Returns: + List[Dict[Any, Any]]: Filtered list of evidence without duplicates + """ + logger = logging.getLogger(__name__) + + if keep not in ['first', 'last']: + raise ValueError("'keep' must be either 'first' or 'last'") + + seen_keys = set() + filtered_evidence = [] + non_unique_items = [] + + # First pass: handle total list + if keep == 'first': + for item in evidence_list: + if unique_key not in item: + non_unique_items.append(item) + continue + + current_key = item[unique_key] + + if current_key not in seen_keys: + filtered_evidence.append(item) + seen_keys.add(current_key) + else: # keep == 'last' + for item in reversed(evidence_list): + if unique_key not in item: + non_unique_items.insert(0, item) + continue + + current_key = item[unique_key] + + if current_key not in seen_keys: + filtered_evidence.insert(0, item) + seen_keys.add(current_key) + + # Handle case with non-unique entries + filtered_evidence.extend(non_unique_items) + + return filtered_evidence \ No newline at end of file diff --git a/agent-framework/tests/unit/test_duplicate_evidence.py b/agent-framework/tests/unit/test_duplicate_evidence.py new file mode 100644 index 00000000..dc3a1294 --- /dev/null +++ b/agent-framework/tests/unit/test_duplicate_evidence.py @@ -0,0 +1,114 @@ +import pytest +import logging +from typing import List, Dict +from prometheus_swarm.utils.duplicate_evidence import ( + validate_unique_evidence, + DuplicateEvidenceError, + log_evidence_summary, + filter_duplicates +) + +def test_validate_unique_evidence_no_duplicates(): + """Test validation of unique evidence passes.""" + evidence = [ + {'id': 1, 'data': 'first'}, + {'id': 2, 'data': 'second'}, + {'id': 3, 'data': 'third'} + ] + + try: + validate_unique_evidence(evidence) + except DuplicateEvidenceError: + pytest.fail("Unexpected DuplicateEvidenceError raised") + +def test_validate_unique_evidence_with_duplicates(): + """Test validation raises error for duplicate evidence.""" + evidence = [ + {'id': 1, 'data': 'first'}, + {'id': 2, 'data': 'second'}, + {'id': 1, 'data': 'duplicate'} + ] + + with pytest.raises(DuplicateEvidenceError, match="Duplicate evidence detected"): + validate_unique_evidence(evidence) + +def test_validate_unique_evidence_missing_key(): + """Test behavior with evidence missing unique key.""" + evidence = [ + {'id': 1, 'data': 'first'}, + {'data': 'no id'}, + {'id': 2, 'data': 'second'} + ] + + # Should not raise an error, just log a warning + validate_unique_evidence(evidence) + +def test_log_evidence_summary(caplog): + """Test logging of evidence summary.""" + evidence = [ + {'id': 1, 'data': 'first'}, + {'id': 2, 'data': 'second'} + ] + + with caplog.at_level(logging.INFO): + log_evidence_summary(evidence) + + assert "Total evidence entries: 2" in caplog.text + +def test_filter_duplicates_first_occurrence(): + """Test filtering duplicates, keeping first occurrence.""" + evidence = [ + {'id': 1, 'data': 'first'}, + {'id': 2, 'data': 'second'}, + {'id': 1, 'data': 'duplicate'} + ] + + filtered = filter_duplicates(evidence) + + assert len(filtered) == 2 + assert filtered == [ + {'id': 1, 'data': 'first'}, + {'id': 2, 'data': 'second'} + ] + +def test_filter_duplicates_last_occurrence(): + """Test filtering duplicates, keeping last occurrence.""" + evidence = [ + {'id': 1, 'data': 'first'}, + {'id': 2, 'data': 'second'}, + {'id': 1, 'data': 'duplicate'} + ] + + filtered = filter_duplicates(evidence, keep='last') + + assert len(filtered) == 2 + assert filtered == [ + {'id': 2, 'data': 'second'}, + {'id': 1, 'data': 'duplicate'} + ] + +def test_filter_duplicates_invalid_keep_strategy(): + """Test that an invalid keep strategy raises an error.""" + evidence = [ + {'id': 1, 'data': 'first'}, + {'id': 2, 'data': 'second'} + ] + + with pytest.raises(ValueError, match="'keep' must be either 'first' or 'last'"): + filter_duplicates(evidence, keep='invalid') + +def test_filter_duplicates_missing_key(): + """Test filtering duplicates with entries missing unique key.""" + evidence = [ + {'id': 1, 'data': 'first'}, + {'data': 'no id'}, + {'id': 1, 'data': 'duplicate'} + ] + + filtered = filter_duplicates(evidence) + + assert len(filtered) == 2 + assert filtered == [ + {'id': 1, 'data': 'first'}, + {'data': 'no id'} + ] \ No newline at end of file