Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions agent-framework/prometheus_swarm/utils/nonce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import uuid
import time
import hashlib
from typing import Dict, Any, Optional

class NonceRequestInterface:
"""
A class to manage nonce (number used once) requests for secure communication.

The nonce is a unique, one-time use value that helps prevent replay attacks
and ensures the uniqueness of each request.
"""

def __init__(self, max_age_seconds: int = 300):
"""
Initialize the Nonce Request Interface.

:param max_age_seconds: Maximum time a nonce is considered valid (default 5 minutes)
"""
self._nonce_store: Dict[str, Dict[str, Any]] = {}
self._max_age_seconds = max_age_seconds

def generate_nonce(self, context: Optional[str] = None) -> str:
"""
Generate a unique nonce.

:param context: Optional context for the nonce (e.g., user ID, request type)
:return: A unique nonce string
"""
# Generate a UUID-based nonce
nonce = str(uuid.uuid4())

# Store nonce details
self._nonce_store[nonce] = {
'created_at': time.time(),
'context': context,
'used': False
}

return nonce

def validate_nonce(self, nonce: str, context: Optional[str] = None) -> bool:
"""
Validate a nonce.

:param nonce: Nonce to validate
:param context: Optional context to match against
:return: True if nonce is valid, False otherwise
"""
# Check if nonce exists
if nonce not in self._nonce_store:
return False

# Get nonce details
nonce_details = self._nonce_store[nonce]

# Check if nonce has already been used
if nonce_details['used']:
return False

# Check nonce age
current_time = time.time()
if current_time - nonce_details['created_at'] > self._max_age_seconds:
del self._nonce_store[nonce]
return False

# Check context if provided
if context is not None and nonce_details['context'] != context:
return False

# Mark nonce as used
nonce_details['used'] = True

return True

def clear_expired_nonces(self) -> int:
"""
Clear expired nonces from the store.

:return: Number of nonces removed
"""
current_time = time.time()
expired_nonces = [
nonce for nonce, details in self._nonce_store.items()
if details['used'] or current_time - details['created_at'] > self._max_age_seconds
]

for nonce in expired_nonces:
del self._nonce_store[nonce]

return len(expired_nonces)
79 changes: 79 additions & 0 deletions agent-framework/tests/unit/test_nonce_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import time
import pytest
from prometheus_swarm.utils.nonce import NonceRequestInterface

def test_generate_nonce():
"""Test generating a unique nonce"""
nonce_interface = NonceRequestInterface()
nonce1 = nonce_interface.generate_nonce()
nonce2 = nonce_interface.generate_nonce()

assert nonce1 != nonce2
assert isinstance(nonce1, str)
assert len(nonce1) > 0

def test_validate_nonce():
"""Test validating a generated nonce"""
nonce_interface = NonceRequestInterface()
nonce = nonce_interface.generate_nonce()

assert nonce_interface.validate_nonce(nonce) is True
assert nonce_interface.validate_nonce(nonce) is False # Cannot reuse nonce

def test_nonce_context():
"""Test nonce validation with context"""
nonce_interface = NonceRequestInterface()
nonce1 = nonce_interface.generate_nonce(context='user1')
nonce2 = nonce_interface.generate_nonce(context='user2')

assert nonce_interface.validate_nonce(nonce1, context='user1') is True
assert nonce_interface.validate_nonce(nonce1, context='user2') is False
assert nonce_interface.validate_nonce(nonce2, context='user1') is False

def test_nonce_expiration():
"""Test nonce expiration"""
nonce_interface = NonceRequestInterface(max_age_seconds=1)
nonce = nonce_interface.generate_nonce()

time.sleep(2) # Wait for nonce to expire
assert nonce_interface.validate_nonce(nonce) is False

def test_clear_expired_nonces():
"""Test clearing expired nonces"""
nonce_interface = NonceRequestInterface(max_age_seconds=1)

# Generate multiple nonces
nonces = [nonce_interface.generate_nonce() for _ in range(5)]

# Use some nonces
nonce_interface.validate_nonce(nonces[0])
nonce_interface.validate_nonce(nonces[1])

time.sleep(2) # Wait for nonces to expire

# Clear expired nonces
cleared_count = nonce_interface.clear_expired_nonces()
assert cleared_count > 0

def test_invalid_nonce():
"""Test validation of an invalid nonce"""
nonce_interface = NonceRequestInterface()
invalid_nonce = 'not-a-real-nonce'

assert nonce_interface.validate_nonce(invalid_nonce) is False

def test_nonce_thread_safety(mocker):
"""Test basic thread safety of nonce generation"""
nonce_interface = NonceRequestInterface()

# Simulate multiple threads generating nonces
mock_threads = [
mocker.Mock(name=f'thread_{i}') for i in range(10)
]

nonces = set()
for thread in mock_threads:
nonce = nonce_interface.generate_nonce()
nonces.add(nonce)

assert len(nonces) == len(mock_threads)