Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions agent-framework/prometheus_swarm/nonce_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import uuid
import time
from typing import Dict, Optional, Any

class NonceRequestInterface:
"""
A class to manage nonce (number used once) generation and validation.

Attributes:
_nonce_store (Dict[str, Dict[str, Any]]): A store to track generated nonces.
_nonce_expiry_seconds (int): Duration for which a nonce remains valid.
"""

def __init__(self, nonce_expiry_seconds: int = 300):
"""
Initialize the NonceRequestInterface.

Args:
nonce_expiry_seconds (int, optional): Time in seconds after which a nonce expires.
Defaults to 300 seconds (5 minutes).
"""
self._nonce_store: Dict[str, Dict[str, Any]] = {}
self._nonce_expiry_seconds = nonce_expiry_seconds

def generate_nonce(self, context: Optional[str] = None) -> str:
"""
Generate a unique nonce for a given context.

Args:
context (str, optional): An optional context or identifier for the nonce.
Allows for more specific nonce tracking.

Returns:
str: A unique nonce value.
"""
nonce = str(uuid.uuid4())
current_time = time.time()

nonce_entry = {
'created_at': current_time,
'context': context,
'used': False
}

self._nonce_store[nonce] = nonce_entry
self._cleanup_expired_nonces()

return nonce

def validate_nonce(self, nonce: str, context: Optional[str] = None) -> bool:
"""
Validate a nonce, checking its existence, expiry, and usage status.

Args:
nonce (str): The nonce to validate.
context (str, optional): Optional context to match against the nonce.

Returns:
bool: True if the nonce is valid, False otherwise.
"""
self._cleanup_expired_nonces()

# Check if nonce exists
if nonce not in self._nonce_store:
return False

nonce_entry = self._nonce_store[nonce]

# Check context if provided
if context is not None and nonce_entry['context'] != context:
return False

# Check if nonce has already been used
if nonce_entry['used']:
return False

# Check nonce expiry
current_time = time.time()
if current_time - nonce_entry['created_at'] > self._nonce_expiry_seconds:
return False

# Mark nonce as used
nonce_entry['used'] = True
return True

def _cleanup_expired_nonces(self):
"""
Remove expired and used nonces from the nonce store.
"""
current_time = time.time()
expired_nonces = [
nonce for nonce, entry in self._nonce_store.items()
if entry['used'] or
current_time - entry['created_at'] > self._nonce_expiry_seconds
]

for nonce in expired_nonces:
del self._nonce_store[nonce]
74 changes: 74 additions & 0 deletions agent-framework/prometheus_swarm/tests/test_nonce_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import time
import pytest
from prometheus_swarm.nonce_interface import NonceRequestInterface

def test_generate_nonce():
"""Test nonce generation creates unique nonces."""
nonce_interface = NonceRequestInterface()

nonce1 = nonce_interface.generate_nonce()
nonce2 = nonce_interface.generate_nonce()

assert nonce1 != nonce2, "Generated nonces should be unique"

def test_nonce_validation():
"""Test basic nonce validation."""
nonce_interface = NonceRequestInterface()

nonce = nonce_interface.generate_nonce()
assert nonce_interface.validate_nonce(nonce), "Valid nonce should be validated"

# Second validation should fail (nonce can only be used once)
assert not nonce_interface.validate_nonce(nonce), "Used nonce should not be revalidated"

def test_nonce_context():
"""Test nonce validation with context."""
nonce_interface = NonceRequestInterface()

nonce1 = nonce_interface.generate_nonce(context='login')
nonce2 = nonce_interface.generate_nonce(context='signup')

# Validate with correct context
assert nonce_interface.validate_nonce(nonce1, context='login'), "Nonce with matching context should validate"
assert nonce_interface.validate_nonce(nonce2, context='signup'), "Nonce with matching context should validate"

# Validate with incorrect context
assert not nonce_interface.validate_nonce(nonce1, context='signup'), "Nonce with mismatched context should not validate"

def test_nonce_expiry():
"""Test nonce expiration."""
nonce_interface = NonceRequestInterface(nonce_expiry_seconds=1)

nonce = nonce_interface.generate_nonce()

# Wait for nonce to expire
time.sleep(2)

assert not nonce_interface.validate_nonce(nonce), "Expired nonce should not validate"

def test_nonce_store_cleanup():
"""Test automatic cleanup of expired and used nonces."""
nonce_interface = NonceRequestInterface(nonce_expiry_seconds=1)

# Generate multiple nonces
nonces = [nonce_interface.generate_nonce() for _ in range(5)]

# Use and validate some nonces
nonce_interface.validate_nonce(nonces[0])
nonce_interface.validate_nonce(nonces[1])

# Wait for expiry
time.sleep(2)

# Trigger internal cleanup
nonce = nonce_interface.generate_nonce()

# Check that the nonce store has been cleaned
assert len(nonce_interface._nonce_store) <= 1, "Nonce store should be cleaned up after expiry"

def test_invalid_nonce():
"""Test validation of non-existent nonce."""
nonce_interface = NonceRequestInterface()

invalid_nonce = 'non-existent-nonce'
assert not nonce_interface.validate_nonce(invalid_nonce), "Non-existent nonce should not validate"