diff --git a/agent-framework/prometheus_swarm/utils/nonce.py b/agent-framework/prometheus_swarm/utils/nonce.py new file mode 100644 index 00000000..d8fcfb87 --- /dev/null +++ b/agent-framework/prometheus_swarm/utils/nonce.py @@ -0,0 +1,78 @@ +""" +Nonce Configuration Management Module + +This module provides utilities for generating, managing, and validating nonces +to prevent replay attacks and ensure request uniqueness. +""" + +import hashlib +import time +import uuid +from typing import Dict, Optional + + +class NonceManager: + """ + Manages nonce generation, tracking, and validation. + + Attributes: + _used_nonces (Dict[str, float]): Tracks used nonces and their timestamps + _nonce_expiry_seconds (int): Expiration time for nonces + """ + + def __init__(self, nonce_expiry_seconds: int = 300): + """ + Initialize NonceManager. + + Args: + nonce_expiry_seconds (int, optional): Nonce expiration time. Defaults to 300 seconds. + """ + self._used_nonces: Dict[str, float] = {} + self._nonce_expiry_seconds = nonce_expiry_seconds + + def generate_nonce(self) -> str: + """ + Generate a unique, secure nonce. + + Returns: + str: A unique nonce string + """ + unique_input = f"{uuid.uuid4()}{time.time()}" + return hashlib.sha256(unique_input.encode()).hexdigest() + + def validate_nonce(self, nonce: str) -> bool: + """ + Validate a nonce, checking for uniqueness and expiration. + + Args: + nonce (str): Nonce to validate + + Returns: + bool: Whether the nonce is valid and can be used + """ + current_time = time.time() + + # Remove expired nonces + self._cleanup_expired_nonces(current_time) + + # Check if nonce has been used + if nonce in self._used_nonces: + return False + + # Mark nonce as used + self._used_nonces[nonce] = current_time + return True + + def _cleanup_expired_nonces(self, current_time: float) -> None: + """ + Remove nonces that have exceeded the expiration time. + + Args: + current_time (float): Current timestamp + """ + expired_nonces = [ + n for n, timestamp in self._used_nonces.items() + if current_time - timestamp > self._nonce_expiry_seconds + ] + for nonce in expired_nonces: + del self._used_nonces[nonce] \ No newline at end of file diff --git a/agent-framework/tests/unit/test_nonce.py b/agent-framework/tests/unit/test_nonce.py new file mode 100644 index 00000000..b4ee9fdd --- /dev/null +++ b/agent-framework/tests/unit/test_nonce.py @@ -0,0 +1,79 @@ +""" +Unit tests for Nonce Configuration Management +""" + +import time +import pytest +from prometheus_swarm.utils.nonce import NonceManager + + +def test_nonce_generation(): + """Test that nonce generation returns unique values.""" + nonce_manager = NonceManager() + nonce1 = nonce_manager.generate_nonce() + nonce2 = nonce_manager.generate_nonce() + + assert nonce1 != nonce2 + assert len(nonce1) == 64 # SHA-256 produces 64-character hex string + assert isinstance(nonce1, str) + + +def test_nonce_validation(): + """Test nonce validation logic.""" + nonce_manager = NonceManager() + nonce = nonce_manager.generate_nonce() + + # First validation should succeed + assert nonce_manager.validate_nonce(nonce) is True + + # Second validation of same nonce should fail + assert nonce_manager.validate_nonce(nonce) is False + + +def test_nonce_expiration(): + """Test nonce expiration mechanism.""" + class MockNonceManager(NonceManager): + def __init__(self, nonce_expiry_seconds): + super().__init__(nonce_expiry_seconds) + + # Create a nonce manager with very short expiry + nonce_manager = MockNonceManager(nonce_expiry_seconds=1) + nonce = nonce_manager.generate_nonce() + + # Wait for nonce to expire + time.sleep(1.1) + + # Nonce should now be re-usable + assert nonce_manager.validate_nonce(nonce) is True + + +def test_multiple_nonce_tracking(): + """Test tracking multiple unique nonces.""" + nonce_manager = NonceManager() + nonces = [nonce_manager.generate_nonce() for _ in range(10)] + + # All nonces should be valid on first validation + validation_results = [nonce_manager.validate_nonce(nonce) for nonce in nonces] + assert all(validation_results) + + # None should be valid on second validation + validation_results = [nonce_manager.validate_nonce(nonce) for nonce in nonces] + assert not any(validation_results) + + +def test_nonce_cleanup(): + """Test internal nonce cleanup mechanism.""" + nonce_manager = NonceManager(nonce_expiry_seconds=1) + + # Generate some nonces + nonces = [nonce_manager.generate_nonce() for _ in range(5)] + + # Wait for expiry + time.sleep(1.1) + + # Trigger cleanup by validating a new nonce + new_nonce = nonce_manager.generate_nonce() + + # All previous nonces should now be re-usable + reusable_results = [nonce_manager.validate_nonce(nonce) for nonce in nonces] + assert all(reusable_results) \ No newline at end of file