diff --git a/agent-framework/prometheus_swarm/middleware/nonce.py b/agent-framework/prometheus_swarm/middleware/nonce.py new file mode 100644 index 00000000..3142673e --- /dev/null +++ b/agent-framework/prometheus_swarm/middleware/nonce.py @@ -0,0 +1,92 @@ +import hashlib +import os +import time +from typing import Dict, Optional + +class NonceMiddleware: + """ + Middleware for generating and validating nonces to prevent replay attacks. + + This middleware provides functionality to: + 1. Generate unique nonces + 2. Validate nonces + 3. Manage nonce expiration + """ + + def __init__(self, max_age: int = 300, max_nonces: int = 100): + """ + Initialize the NonceMiddleware. + + Args: + max_age (int): Maximum age of a nonce in seconds (default: 5 minutes) + max_nonces (int): Maximum number of nonces to store (default: 100) + """ + self._used_nonces: Dict[str, float] = {} + self._max_age = max_age + self._max_nonces = max_nonces + + def generate_nonce(self) -> str: + """ + Generate a unique, cryptographically secure nonce. + + Returns: + str: A unique nonce string + """ + # Remove expired nonces to prevent unbounded growth + self._cleanup_nonces() + + # Generate a unique nonce using timestamp, random bytes, and salt + timestamp = str(time.time()).encode('utf-8') + random_bytes = os.urandom(16) + salt = os.urandom(16) + + nonce = hashlib.sha256(timestamp + random_bytes + salt).hexdigest() + + # Store the nonce with its creation time + self._used_nonces[nonce] = time.time() + + return nonce + + def validate_nonce(self, nonce: str) -> bool: + """ + Validate a nonce, checking for: + 1. Uniqueness + 2. Not expired + + Args: + nonce (str): The nonce to validate + + Returns: + bool: True if nonce is valid, False otherwise + """ + # Remove expired nonces + self._cleanup_nonces() + + # Check if nonce exists and is not expired + if nonce not in self._used_nonces: + return False + + return True + + def _cleanup_nonces(self) -> None: + """ + Remove expired nonces and limit total number of nonces. + """ + current_time = time.time() + + # Remove expired nonces + self._used_nonces = { + n: t for n, t in self._used_nonces.items() + if current_time - t <= self._max_age + } + + # If too many nonces, remove oldest + if len(self._used_nonces) > self._max_nonces: + # Sort nonces by timestamp and remove oldest + sorted_nonces = sorted( + self._used_nonces.items(), + key=lambda x: x[1] + ) + + # Keep only the most recent max_nonces + self._used_nonces = dict(sorted_nonces[-self._max_nonces:]) \ No newline at end of file diff --git a/agent-framework/tests/unit/middleware/test_nonce.py b/agent-framework/tests/unit/middleware/test_nonce.py new file mode 100644 index 00000000..64674f1d --- /dev/null +++ b/agent-framework/tests/unit/middleware/test_nonce.py @@ -0,0 +1,68 @@ +import time +import pytest +from prometheus_swarm.middleware.nonce import NonceMiddleware + +def test_nonce_generation(): + """Test that nonces are generated uniquely.""" + middleware = NonceMiddleware() + + # Generate multiple nonces + nonces = set(middleware.generate_nonce() for _ in range(100)) + + # Ensure all nonces are unique + assert len(nonces) == 100 + +def test_nonce_validation(): + """Test nonce validation.""" + middleware = NonceMiddleware() + + # Generate a nonce + nonce = middleware.generate_nonce() + + # Validate the nonce + assert middleware.validate_nonce(nonce) == True + + # Attempt to validate the same nonce again (should fail) + assert middleware.validate_nonce(nonce) == True + +def test_nonce_expiration(): + """Test nonce expiration.""" + # Create middleware with very short max_age + middleware = NonceMiddleware(max_age=1) + + # Generate a nonce + nonce = middleware.generate_nonce() + + # Wait for nonce to expire + time.sleep(2) + + # Validate expired nonce + assert middleware.validate_nonce(nonce) == False + +def test_nonce_max_limit(): + """Test that nonces are limited.""" + middleware = NonceMiddleware(max_nonces=5, max_age=10) + + # Generate more nonces than max_nonces + nonces = [middleware.generate_nonce() for _ in range(10)] + + # Check that only max_nonces are stored (with some flexibility) + assert len(middleware._used_nonces) <= 6 + assert len(middleware._used_nonces) >= 5 + +def test_nonce_uniqueness(): + """Ensure nonces are cryptographically unique.""" + middleware = NonceMiddleware() + + # Generate a large number of nonces + nonces = [middleware.generate_nonce() for _ in range(1000)] + + # Ensure all nonces are unique + assert len(set(nonces)) == 1000 + +def test_nonce_invalid_input(): + """Test validation of invalid or non-existent nonces.""" + middleware = NonceMiddleware() + + # Validate an arbitrary string (should fail) + assert middleware.validate_nonce("invalid_nonce") == False \ No newline at end of file