diff --git a/agent-framework/prometheus_swarm/nonce_interface.py b/agent-framework/prometheus_swarm/nonce_interface.py new file mode 100644 index 00000000..a01a695b --- /dev/null +++ b/agent-framework/prometheus_swarm/nonce_interface.py @@ -0,0 +1,98 @@ +import uuid +import time +from typing import Dict, Optional, Any + +class NonceRequestInterface: + """ + A class to manage nonce (number used once) generation and validation. + + Attributes: + _nonce_store (Dict[str, Dict[str, Any]]): A store to track generated nonces. + _nonce_expiry_seconds (int): Duration for which a nonce remains valid. + """ + + def __init__(self, nonce_expiry_seconds: int = 300): + """ + Initialize the NonceRequestInterface. + + Args: + nonce_expiry_seconds (int, optional): Time in seconds after which a nonce expires. + Defaults to 300 seconds (5 minutes). + """ + self._nonce_store: Dict[str, Dict[str, Any]] = {} + self._nonce_expiry_seconds = nonce_expiry_seconds + + def generate_nonce(self, context: Optional[str] = None) -> str: + """ + Generate a unique nonce for a given context. + + Args: + context (str, optional): An optional context or identifier for the nonce. + Allows for more specific nonce tracking. + + Returns: + str: A unique nonce value. + """ + nonce = str(uuid.uuid4()) + current_time = time.time() + + nonce_entry = { + 'created_at': current_time, + 'context': context, + 'used': False + } + + self._nonce_store[nonce] = nonce_entry + self._cleanup_expired_nonces() + + return nonce + + def validate_nonce(self, nonce: str, context: Optional[str] = None) -> bool: + """ + Validate a nonce, checking its existence, expiry, and usage status. + + Args: + nonce (str): The nonce to validate. + context (str, optional): Optional context to match against the nonce. + + Returns: + bool: True if the nonce is valid, False otherwise. + """ + self._cleanup_expired_nonces() + + # Check if nonce exists + if nonce not in self._nonce_store: + return False + + nonce_entry = self._nonce_store[nonce] + + # Check context if provided + if context is not None and nonce_entry['context'] != context: + return False + + # Check if nonce has already been used + if nonce_entry['used']: + return False + + # Check nonce expiry + current_time = time.time() + if current_time - nonce_entry['created_at'] > self._nonce_expiry_seconds: + return False + + # Mark nonce as used + nonce_entry['used'] = True + return True + + def _cleanup_expired_nonces(self): + """ + Remove expired and used nonces from the nonce store. + """ + current_time = time.time() + expired_nonces = [ + nonce for nonce, entry in self._nonce_store.items() + if entry['used'] or + current_time - entry['created_at'] > self._nonce_expiry_seconds + ] + + for nonce in expired_nonces: + del self._nonce_store[nonce] \ No newline at end of file diff --git a/agent-framework/prometheus_swarm/tests/test_nonce_interface.py b/agent-framework/prometheus_swarm/tests/test_nonce_interface.py new file mode 100644 index 00000000..677db361 --- /dev/null +++ b/agent-framework/prometheus_swarm/tests/test_nonce_interface.py @@ -0,0 +1,74 @@ +import time +import pytest +from prometheus_swarm.nonce_interface import NonceRequestInterface + +def test_generate_nonce(): + """Test nonce generation creates unique nonces.""" + nonce_interface = NonceRequestInterface() + + nonce1 = nonce_interface.generate_nonce() + nonce2 = nonce_interface.generate_nonce() + + assert nonce1 != nonce2, "Generated nonces should be unique" + +def test_nonce_validation(): + """Test basic nonce validation.""" + nonce_interface = NonceRequestInterface() + + nonce = nonce_interface.generate_nonce() + assert nonce_interface.validate_nonce(nonce), "Valid nonce should be validated" + + # Second validation should fail (nonce can only be used once) + assert not nonce_interface.validate_nonce(nonce), "Used nonce should not be revalidated" + +def test_nonce_context(): + """Test nonce validation with context.""" + nonce_interface = NonceRequestInterface() + + nonce1 = nonce_interface.generate_nonce(context='login') + nonce2 = nonce_interface.generate_nonce(context='signup') + + # Validate with correct context + assert nonce_interface.validate_nonce(nonce1, context='login'), "Nonce with matching context should validate" + assert nonce_interface.validate_nonce(nonce2, context='signup'), "Nonce with matching context should validate" + + # Validate with incorrect context + assert not nonce_interface.validate_nonce(nonce1, context='signup'), "Nonce with mismatched context should not validate" + +def test_nonce_expiry(): + """Test nonce expiration.""" + nonce_interface = NonceRequestInterface(nonce_expiry_seconds=1) + + nonce = nonce_interface.generate_nonce() + + # Wait for nonce to expire + time.sleep(2) + + assert not nonce_interface.validate_nonce(nonce), "Expired nonce should not validate" + +def test_nonce_store_cleanup(): + """Test automatic cleanup of expired and used nonces.""" + nonce_interface = NonceRequestInterface(nonce_expiry_seconds=1) + + # Generate multiple nonces + nonces = [nonce_interface.generate_nonce() for _ in range(5)] + + # Use and validate some nonces + nonce_interface.validate_nonce(nonces[0]) + nonce_interface.validate_nonce(nonces[1]) + + # Wait for expiry + time.sleep(2) + + # Trigger internal cleanup + nonce = nonce_interface.generate_nonce() + + # Check that the nonce store has been cleaned + assert len(nonce_interface._nonce_store) <= 1, "Nonce store should be cleaned up after expiry" + +def test_invalid_nonce(): + """Test validation of non-existent nonce.""" + nonce_interface = NonceRequestInterface() + + invalid_nonce = 'non-existent-nonce' + assert not nonce_interface.validate_nonce(invalid_nonce), "Non-existent nonce should not validate" \ No newline at end of file