Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions agent-framework/prometheus_swarm/utils/nonce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""
Nonce Configuration Management Module

This module provides utilities for generating, managing, and validating nonces
to prevent replay attacks and ensure request uniqueness.
"""

import hashlib
import time
import uuid
from typing import Dict, Optional


class NonceManager:
"""
Manages nonce generation, tracking, and validation.

Attributes:
_used_nonces (Dict[str, float]): Tracks used nonces and their timestamps
_nonce_expiry_seconds (int): Expiration time for nonces
"""

def __init__(self, nonce_expiry_seconds: int = 300):
"""
Initialize NonceManager.

Args:
nonce_expiry_seconds (int, optional): Nonce expiration time. Defaults to 300 seconds.
"""
self._used_nonces: Dict[str, float] = {}
self._nonce_expiry_seconds = nonce_expiry_seconds

def generate_nonce(self) -> str:
"""
Generate a unique, secure nonce.

Returns:
str: A unique nonce string
"""
unique_input = f"{uuid.uuid4()}{time.time()}"
return hashlib.sha256(unique_input.encode()).hexdigest()

def validate_nonce(self, nonce: str) -> bool:
"""
Validate a nonce, checking for uniqueness and expiration.

Args:
nonce (str): Nonce to validate

Returns:
bool: Whether the nonce is valid and can be used
"""
current_time = time.time()

# Remove expired nonces
self._cleanup_expired_nonces(current_time)

# Check if nonce has been used
if nonce in self._used_nonces:
return False

# Mark nonce as used
self._used_nonces[nonce] = current_time
return True

def _cleanup_expired_nonces(self, current_time: float) -> None:
"""
Remove nonces that have exceeded the expiration time.

Args:
current_time (float): Current timestamp
"""
expired_nonces = [
n for n, timestamp in self._used_nonces.items()
if current_time - timestamp > self._nonce_expiry_seconds
]
for nonce in expired_nonces:
del self._used_nonces[nonce]
79 changes: 79 additions & 0 deletions agent-framework/tests/unit/test_nonce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""
Unit tests for Nonce Configuration Management
"""

import time
import pytest
from prometheus_swarm.utils.nonce import NonceManager


def test_nonce_generation():
"""Test that nonce generation returns unique values."""
nonce_manager = NonceManager()
nonce1 = nonce_manager.generate_nonce()
nonce2 = nonce_manager.generate_nonce()

assert nonce1 != nonce2
assert len(nonce1) == 64 # SHA-256 produces 64-character hex string
assert isinstance(nonce1, str)


def test_nonce_validation():
"""Test nonce validation logic."""
nonce_manager = NonceManager()
nonce = nonce_manager.generate_nonce()

# First validation should succeed
assert nonce_manager.validate_nonce(nonce) is True

# Second validation of same nonce should fail
assert nonce_manager.validate_nonce(nonce) is False


def test_nonce_expiration():
"""Test nonce expiration mechanism."""
class MockNonceManager(NonceManager):
def __init__(self, nonce_expiry_seconds):
super().__init__(nonce_expiry_seconds)

# Create a nonce manager with very short expiry
nonce_manager = MockNonceManager(nonce_expiry_seconds=1)
nonce = nonce_manager.generate_nonce()

# Wait for nonce to expire
time.sleep(1.1)

# Nonce should now be re-usable
assert nonce_manager.validate_nonce(nonce) is True


def test_multiple_nonce_tracking():
"""Test tracking multiple unique nonces."""
nonce_manager = NonceManager()
nonces = [nonce_manager.generate_nonce() for _ in range(10)]

# All nonces should be valid on first validation
validation_results = [nonce_manager.validate_nonce(nonce) for nonce in nonces]
assert all(validation_results)

# None should be valid on second validation
validation_results = [nonce_manager.validate_nonce(nonce) for nonce in nonces]
assert not any(validation_results)


def test_nonce_cleanup():
"""Test internal nonce cleanup mechanism."""
nonce_manager = NonceManager(nonce_expiry_seconds=1)

# Generate some nonces
nonces = [nonce_manager.generate_nonce() for _ in range(5)]

# Wait for expiry
time.sleep(1.1)

# Trigger cleanup by validating a new nonce
new_nonce = nonce_manager.generate_nonce()

# All previous nonces should now be re-usable
reusable_results = [nonce_manager.validate_nonce(nonce) for nonce in nonces]
assert all(reusable_results)