diff --git a/agent-framework/prometheus_swarm/__init__.py b/agent-framework/prometheus_swarm/__init__.py index e69de29b..bf4bff79 100644 --- a/agent-framework/prometheus_swarm/__init__.py +++ b/agent-framework/prometheus_swarm/__init__.py @@ -0,0 +1,3 @@ +from .nonce_request import NonceRequest + +__all__ = ['NonceRequest'] \ No newline at end of file diff --git a/agent-framework/prometheus_swarm/nonce_request.py b/agent-framework/prometheus_swarm/nonce_request.py new file mode 100644 index 00000000..c56e7ee4 --- /dev/null +++ b/agent-framework/prometheus_swarm/nonce_request.py @@ -0,0 +1,105 @@ +from typing import Dict, Any, Optional +import uuid +import time +from datetime import datetime, timedelta + +class NonceRequest: + """ + A class to manage cryptographic nonce requests with expiration and validation. + + Nonces are one-time tokens used to prevent replay attacks and ensure request uniqueness. + """ + + def __init__(self, + duration: int = 300, # Default 5 minutes + allow_reuse: bool = False): + """ + Initialize a NonceRequest manager. + + Args: + duration (int): How long a nonce remains valid in seconds. Defaults to 300 (5 minutes). + allow_reuse (bool): Whether nonces can be reused. Defaults to False. + """ + self._nonces: Dict[str, Dict[str, Any]] = {} + self._duration = duration + self._allow_reuse = allow_reuse + + def generate_nonce(self, context: Optional[Dict[str, Any]] = None) -> str: + """ + Generate a new unique nonce. + + Args: + context (dict, optional): Additional context to associate with the nonce. + + Returns: + str: A unique nonce token + """ + nonce = str(uuid.uuid4()) + expiration = datetime.now() + timedelta(seconds=self._duration) + + self._nonces[nonce] = { + 'created_at': datetime.now(), + 'expires_at': expiration, + 'context': context or {}, + 'used': False + } + + return nonce + + def validate_nonce(self, nonce: str) -> bool: + """ + Validate a given nonce. + + Args: + nonce (str): The nonce to validate + + Returns: + bool: Whether the nonce is valid and usable + """ + if nonce not in self._nonces: + return False + + nonce_info = self._nonces[nonce] + + # Check expiration + if datetime.now() > nonce_info['expires_at']: + return False + + # Check reuse + if not self._allow_reuse and nonce_info['used']: + return False + + # Mark as used + nonce_info['used'] = True + + return True + + def get_nonce_context(self, nonce: str) -> Optional[Dict[str, Any]]: + """ + Retrieve the context associated with a nonce. + + Args: + nonce (str): The nonce to retrieve context for + + Returns: + dict or None: The context associated with the nonce, or None if not found + """ + return self._nonces.get(nonce, {}).get('context', None) + + def clear_expired_nonces(self) -> int: + """ + Remove expired nonces from the internal storage. + + Returns: + int: Number of nonces removed + """ + current_time = datetime.now() + expired_nonces = [ + nonce for nonce, info in self._nonces.items() + if current_time > info['expires_at'] + ] + + for nonce in expired_nonces: + del self._nonces[nonce] + + return len(expired_nonces) \ No newline at end of file diff --git a/agent-framework/prometheus_swarm/tests/test_nonce_request.py b/agent-framework/prometheus_swarm/tests/test_nonce_request.py new file mode 100644 index 00000000..ef488b48 --- /dev/null +++ b/agent-framework/prometheus_swarm/tests/test_nonce_request.py @@ -0,0 +1,64 @@ +import pytest +import time +from datetime import datetime, timedelta +from prometheus_swarm.nonce_request import NonceRequest + +def test_generate_nonce(): + """Test that generate_nonce produces a unique nonce.""" + nonce_manager = NonceRequest() + nonce1 = nonce_manager.generate_nonce() + nonce2 = nonce_manager.generate_nonce() + + assert nonce1 != nonce2 + assert len(nonce1) > 0 + +def test_nonce_validation(): + """Test basic nonce validation.""" + nonce_manager = NonceRequest(duration=5) # 5 seconds + nonce = nonce_manager.generate_nonce() + + assert nonce_manager.validate_nonce(nonce) is True + assert nonce_manager.validate_nonce(nonce) is False # No reuse by default + +def test_nonce_expiration(): + """Test nonce expiration.""" + nonce_manager = NonceRequest(duration=1) # 1 second + nonce = nonce_manager.generate_nonce() + + time.sleep(2) # Wait beyond expiration + assert nonce_manager.validate_nonce(nonce) is False + +def test_nonce_context(): + """Test attaching and retrieving nonce context.""" + nonce_manager = NonceRequest() + context = {"user_id": 123, "action": "login"} + nonce = nonce_manager.generate_nonce(context) + + retrieved_context = nonce_manager.get_nonce_context(nonce) + assert retrieved_context == context + +def test_nonce_reuse_allowed(): + """Test nonce when reuse is explicitly allowed.""" + nonce_manager = NonceRequest(duration=5, allow_reuse=True) + nonce = nonce_manager.generate_nonce() + + assert nonce_manager.validate_nonce(nonce) is True + assert nonce_manager.validate_nonce(nonce) is True + +def test_clear_expired_nonces(): + """Test clearing expired nonces.""" + nonce_manager = NonceRequest(duration=1) + + # Generate a few nonces + nonce1 = nonce_manager.generate_nonce() + nonce2 = nonce_manager.generate_nonce() + + time.sleep(2) # Wait beyond expiration + + cleared_count = nonce_manager.clear_expired_nonces() + assert cleared_count == 2 + +def test_invalid_nonce(): + """Test validation of an invalid/non-existent nonce.""" + nonce_manager = NonceRequest() + assert nonce_manager.validate_nonce("invalid_nonce") is False \ No newline at end of file