Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions agent-framework/prometheus_swarm/utils/nonce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import uuid
import time
from typing import Dict, Any
from threading import Lock

class NonceError(Exception):
"""Base exception for nonce-related errors."""
pass

class NonceAlreadyUsedError(NonceError):
"""Raised when a nonce has already been used."""
pass

class NonceExpiredError(NonceError):
"""Raised when a nonce has expired."""
pass

class NonceManager:
"""
Manages nonce generation, validation, and tracking.

Provides thread-safe nonce management with expiration and uniqueness checks.
"""
def __init__(self, max_nonces: int = 1000, nonce_expiry: int = 300):
"""
Initialize the NonceManager.

Args:
max_nonces (int): Maximum number of nonces to track. Default is 1000.
nonce_expiry (int): Nonce expiration time in seconds. Default is 5 minutes.
"""
self._generated_nonces: Dict[str, float] = {}
self._lock = Lock()
self._max_nonces = max_nonces
self._nonce_expiry = nonce_expiry

def generate_nonce(self) -> str:
"""
Generate a unique nonce.

Returns:
str: A unique nonce value.
"""
with self._lock:
# Cleanup expired nonces
self._cleanup_expired_nonces()

# Generate a unique nonce
nonce = str(uuid.uuid4())
current_time = time.time()
self._generated_nonces[nonce] = current_time

return nonce

def validate_nonce(self, nonce: str, consume: bool = True) -> bool:
"""
Validate a nonce, checking for uniqueness and expiration.

Args:
nonce (str): The nonce to validate.
consume (bool): Whether to remove the nonce after validation. Default is True.

Raises:
NonceAlreadyUsedError: If the nonce is not found or has been validated.

Returns:
bool: True if the nonce is valid and has not been previously validated.
"""
with self._lock:
# Cleanup expired nonces
self._cleanup_expired_nonces()

# Check if nonce is present and not expired
if nonce not in self._generated_nonces:
raise NonceAlreadyUsedError(f"Nonce {nonce} is not valid or has been used.")

# If consume is True, remove the nonce
if consume:
del self._generated_nonces[nonce]

return True

def _cleanup_expired_nonces(self):
"""
Remove expired nonces from the tracking dictionary.
"""
current_time = time.time()
expired_nonces = [
nonce for nonce, timestamp in self._generated_nonces.items()
if current_time - timestamp > self._nonce_expiry
]

# Remove expired nonces
for nonce in expired_nonces:
del self._generated_nonces[nonce]

# Limit the number of tracked nonces
if len(self._generated_nonces) > self._max_nonces:
# Remove the oldest nonces first
oldest_nonces = sorted(
self._generated_nonces.items(),
key=lambda x: x[1]
)[:len(self._generated_nonces) - self._max_nonces]

for nonce, _ in oldest_nonces:
del self._generated_nonces[nonce]

# Global singleton instance for easy import and use
nonce_manager = NonceManager()
83 changes: 83 additions & 0 deletions agent-framework/tests/unit/utils/test_nonce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import time
import pytest
from concurrent.futures import ThreadPoolExecutor, as_completed
from prometheus_swarm.utils.nonce import NonceManager, NonceAlreadyUsedError, NonceExpiredError

def test_nonce_generation():
"""Test that nonces are generated uniquely."""
nonce_manager = NonceManager()
nonce1 = nonce_manager.generate_nonce()
nonce2 = nonce_manager.generate_nonce()

assert nonce1 != nonce2, "Nonces should be unique"

def test_nonce_validation():
"""Test nonce validation process."""
nonce_manager = NonceManager()

# Generate and validate a nonce
nonce = nonce_manager.generate_nonce()
assert nonce_manager.validate_nonce(nonce) is True

# Attempting to validate the same nonce again should raise an error
with pytest.raises(NonceAlreadyUsedError):
nonce_manager.validate_nonce(nonce)

def test_nonce_validation_without_consume():
"""Test nonce validation without consuming the nonce."""
nonce_manager = NonceManager()

# Generate a nonce
nonce = nonce_manager.generate_nonce()

# Validate without consuming first
assert nonce_manager.validate_nonce(nonce, consume=False) is True

# Second validation without consume allowed
assert nonce_manager.validate_nonce(nonce, consume=False) is True

# Explicit consume after allows subsequent consume to fail
nonce_manager.validate_nonce(nonce)

with pytest.raises(NonceAlreadyUsedError):
nonce_manager.validate_nonce(nonce)

def test_nonce_expiration():
"""Test nonce expiration mechanism."""
# Create a nonce manager with very short expiry for testing
nonce_manager = NonceManager(nonce_expiry=1)

nonce = nonce_manager.generate_nonce()

# Wait for nonce to expire
time.sleep(2)

# Validate should no longer return the same nonce
with pytest.raises(NonceAlreadyUsedError):
nonce_manager.validate_nonce(nonce)

def test_nonce_max_limit():
"""Test that nonce tracking respects the maximum limit."""
nonce_manager = NonceManager(max_nonces=3)

# Generate more nonces than the max limit
nonces = [nonce_manager.generate_nonce() for _ in range(5)]

# Validate the 3 newest nonces
for nonce in nonces[2:]:
assert nonce_manager.validate_nonce(nonce) is True

def test_thread_safety():
"""Test thread safety of nonce manager."""
nonce_manager = NonceManager()

def generate_and_validate_nonce():
nonce = nonce_manager.generate_nonce()
return nonce_manager.validate_nonce(nonce)

with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(generate_and_validate_nonce) for _ in range(100)]

# Collect results ensuring no exceptions
results = [future.result() for future in as_completed(futures)]
assert len(results) == 100, "All nonce generations should succeed"