Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions agent-framework/prometheus_swarm/utils/nonce_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import os
import hashlib
import secrets
import json
from typing import Dict, Optional

class NonceConfigManager:
"""
Manages nonce configuration for secure token generation and validation.

Provides methods to:
- Generate secure nonces
- Validate nonces
- Persist and load nonce configurations
"""

def __init__(self, config_path: Optional[str] = None):
"""
Initialize NonceConfigManager with optional config file path.

:param config_path: Path to nonce configuration file
"""
self.config_path = config_path or os.path.join(os.path.dirname(__file__), 'nonce_config.json')
self.nonce_config: Dict[str, str] = self._load_config()

def _load_config(self) -> Dict[str, str]:
"""
Load nonce configuration from file.

:return: Dictionary of nonce configurations
"""
try:
if os.path.exists(self.config_path):
with open(self.config_path, 'r') as f:
return json.load(f)
return {}
except (IOError, json.JSONDecodeError):
return {}

def _save_config(self):
"""
Save current nonce configuration to file.
"""
try:
with open(self.config_path, 'w') as f:
json.dump(self.nonce_config, f, indent=2)
except IOError as e:
raise RuntimeError(f"Failed to save nonce configuration: {e}")

def generate_nonce(self, identifier: str, length: int = 32) -> str:
"""
Generate a secure nonce for a given identifier.

:param identifier: Unique identifier for the nonce
:param length: Length of the nonce in bytes (default: 32)
:return: Generated nonce as a hex string
"""
if not identifier:
raise ValueError("Identifier cannot be empty")

# Ensure minimum length of 16 bytes
length = max(16, length)

# Generate a cryptographically secure random nonce
nonce = secrets.token_hex(length)

# Truncate or pad to ensure consistent length
nonce = nonce[:length * 2]

# Store the nonce with a timestamp hash
self.nonce_config[identifier] = hashlib.sha256(nonce.encode()).hexdigest()
self._save_config()

return nonce

def validate_nonce(self, identifier: str, nonce: str) -> bool:
"""
Validate a nonce for a given identifier.

:param identifier: Unique identifier for the nonce
:param nonce: Nonce to validate
:return: True if nonce is valid, False otherwise
"""
if not identifier or not nonce:
return False

stored_hash = self.nonce_config.get(identifier)
if not stored_hash:
return False

# Validate by comparing hash of input nonce
input_hash = hashlib.sha256(nonce.encode()).hexdigest()
return input_hash == stored_hash

def clear_nonce(self, identifier: str):
"""
Clear a nonce for a given identifier.

:param identifier: Unique identifier for the nonce
"""
if identifier in self.nonce_config:
del self.nonce_config[identifier]
self._save_config()
107 changes: 107 additions & 0 deletions agent-framework/tests/unit/test_nonce_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import os
import tempfile
import pytest
from prometheus_swarm.utils.nonce_config import NonceConfigManager

def test_nonce_generation_and_validation():
"""
Test nonce generation and validation functionality.
"""
# Create a temporary config file
with tempfile.NamedTemporaryFile(delete=False) as temp_config:
temp_config_path = temp_config.name

try:
# Initialize NonceConfigManager with temporary config
nonce_manager = NonceConfigManager(config_path=temp_config_path)

# Test nonce generation
identifier = "test_user"
nonce = nonce_manager.generate_nonce(identifier)

# Validate the generated nonce
assert nonce_manager.validate_nonce(identifier, nonce) is True

# Validate with incorrect nonce
assert nonce_manager.validate_nonce(identifier, "wrong_nonce") is False
finally:
# Clean up the temporary config file
if os.path.exists(temp_config_path):
os.unlink(temp_config_path)

def test_nonce_validation_edge_cases():
"""
Test nonce validation edge cases.
"""
# Create a temporary config file
with tempfile.NamedTemporaryFile(delete=False) as temp_config:
temp_config_path = temp_config.name

try:
nonce_manager = NonceConfigManager(config_path=temp_config_path)

# Test empty inputs
assert nonce_manager.validate_nonce("", "") is False
assert nonce_manager.validate_nonce("user", "") is False
assert nonce_manager.validate_nonce("", "nonce") is False

# Test validation of non-existent identifier
assert nonce_manager.validate_nonce("non_existent_user", "some_nonce") is False
finally:
# Clean up the temporary config file
if os.path.exists(temp_config_path):
os.unlink(temp_config_path)

def test_nonce_generation_requirements():
"""
Test nonce generation requirements and constraints.
"""
# Create a temporary config file
with tempfile.NamedTemporaryFile(delete=False) as temp_config:
temp_config_path = temp_config.name

try:
nonce_manager = NonceConfigManager(config_path=temp_config_path)

# Test invalid nonce generation
with pytest.raises(ValueError, match="Identifier cannot be empty"):
nonce_manager.generate_nonce("")

# Test nonce generation with different lengths
nonce_short = nonce_manager.generate_nonce("user_short", length=16)
nonce_long = nonce_manager.generate_nonce("user_long", length=64)

assert len(nonce_short) == 32 # Hex representation of 16 bytes
assert len(nonce_long) == 128 # Hex representation of 64 bytes
finally:
# Clean up the temporary config file
if os.path.exists(temp_config_path):
os.unlink(temp_config_path)

def test_nonce_clear_functionality():
"""
Test nonce clearing functionality.
"""
# Create a temporary config file
with tempfile.NamedTemporaryFile(delete=False) as temp_config:
temp_config_path = temp_config.name

try:
nonce_manager = NonceConfigManager(config_path=temp_config_path)

# Generate a nonce
identifier = "test_clear_user"
nonce = nonce_manager.generate_nonce(identifier)

# Validate nonce before clearing
assert nonce_manager.validate_nonce(identifier, nonce) is True

# Clear the nonce
nonce_manager.clear_nonce(identifier)

# Validate that the nonce is no longer valid
assert nonce_manager.validate_nonce(identifier, nonce) is False
finally:
# Clean up the temporary config file
if os.path.exists(temp_config_path):
os.unlink(temp_config_path)