Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions agent-framework/prometheus_swarm/middleware/nonce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import hashlib
import os
import time
from typing import Dict, Optional

class NonceMiddleware:
"""
Middleware for generating and validating nonces to prevent replay attacks.

This middleware provides functionality to:
1. Generate unique nonces
2. Validate nonces
3. Manage nonce expiration
"""

def __init__(self, max_age: int = 300, max_nonces: int = 100):
"""
Initialize the NonceMiddleware.

Args:
max_age (int): Maximum age of a nonce in seconds (default: 5 minutes)
max_nonces (int): Maximum number of nonces to store (default: 100)
"""
self._used_nonces: Dict[str, float] = {}
self._max_age = max_age
self._max_nonces = max_nonces

def generate_nonce(self) -> str:
"""
Generate a unique, cryptographically secure nonce.

Returns:
str: A unique nonce string
"""
# Remove expired nonces to prevent unbounded growth
self._cleanup_nonces()

# Generate a unique nonce using timestamp, random bytes, and salt
timestamp = str(time.time()).encode('utf-8')
random_bytes = os.urandom(16)
salt = os.urandom(16)

nonce = hashlib.sha256(timestamp + random_bytes + salt).hexdigest()

# Store the nonce with its creation time
self._used_nonces[nonce] = time.time()

return nonce

def validate_nonce(self, nonce: str) -> bool:
"""
Validate a nonce, checking for:
1. Uniqueness
2. Not expired

Args:
nonce (str): The nonce to validate

Returns:
bool: True if nonce is valid, False otherwise
"""
# Remove expired nonces
self._cleanup_nonces()

# Check if nonce exists and is not expired
if nonce not in self._used_nonces:
return False

return True

def _cleanup_nonces(self) -> None:
"""
Remove expired nonces and limit total number of nonces.
"""
current_time = time.time()

# Remove expired nonces
self._used_nonces = {
n: t for n, t in self._used_nonces.items()
if current_time - t <= self._max_age
}

# If too many nonces, remove oldest
if len(self._used_nonces) > self._max_nonces:
# Sort nonces by timestamp and remove oldest
sorted_nonces = sorted(
self._used_nonces.items(),
key=lambda x: x[1]
)

# Keep only the most recent max_nonces
self._used_nonces = dict(sorted_nonces[-self._max_nonces:])
68 changes: 68 additions & 0 deletions agent-framework/tests/unit/middleware/test_nonce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import time
import pytest
from prometheus_swarm.middleware.nonce import NonceMiddleware

def test_nonce_generation():
"""Test that nonces are generated uniquely."""
middleware = NonceMiddleware()

# Generate multiple nonces
nonces = set(middleware.generate_nonce() for _ in range(100))

# Ensure all nonces are unique
assert len(nonces) == 100

def test_nonce_validation():
"""Test nonce validation."""
middleware = NonceMiddleware()

# Generate a nonce
nonce = middleware.generate_nonce()

# Validate the nonce
assert middleware.validate_nonce(nonce) == True

# Attempt to validate the same nonce again (should fail)
assert middleware.validate_nonce(nonce) == True

def test_nonce_expiration():
"""Test nonce expiration."""
# Create middleware with very short max_age
middleware = NonceMiddleware(max_age=1)

# Generate a nonce
nonce = middleware.generate_nonce()

# Wait for nonce to expire
time.sleep(2)

# Validate expired nonce
assert middleware.validate_nonce(nonce) == False

def test_nonce_max_limit():
"""Test that nonces are limited."""
middleware = NonceMiddleware(max_nonces=5, max_age=10)

# Generate more nonces than max_nonces
nonces = [middleware.generate_nonce() for _ in range(10)]

# Check that only max_nonces are stored (with some flexibility)
assert len(middleware._used_nonces) <= 6
assert len(middleware._used_nonces) >= 5

def test_nonce_uniqueness():
"""Ensure nonces are cryptographically unique."""
middleware = NonceMiddleware()

# Generate a large number of nonces
nonces = [middleware.generate_nonce() for _ in range(1000)]

# Ensure all nonces are unique
assert len(set(nonces)) == 1000

def test_nonce_invalid_input():
"""Test validation of invalid or non-existent nonces."""
middleware = NonceMiddleware()

# Validate an arbitrary string (should fail)
assert middleware.validate_nonce("invalid_nonce") == False