diff --git a/agent-framework/prometheus_swarm/utils/nonce_config.py b/agent-framework/prometheus_swarm/utils/nonce_config.py new file mode 100644 index 00000000..219ccacb --- /dev/null +++ b/agent-framework/prometheus_swarm/utils/nonce_config.py @@ -0,0 +1,103 @@ +import os +import hashlib +import secrets +import json +from typing import Dict, Optional + +class NonceConfigManager: + """ + Manages nonce configuration for secure token generation and validation. + + Provides methods to: + - Generate secure nonces + - Validate nonces + - Persist and load nonce configurations + """ + + def __init__(self, config_path: Optional[str] = None): + """ + Initialize NonceConfigManager with optional config file path. + + :param config_path: Path to nonce configuration file + """ + self.config_path = config_path or os.path.join(os.path.dirname(__file__), 'nonce_config.json') + self.nonce_config: Dict[str, str] = self._load_config() + + def _load_config(self) -> Dict[str, str]: + """ + Load nonce configuration from file. + + :return: Dictionary of nonce configurations + """ + try: + if os.path.exists(self.config_path): + with open(self.config_path, 'r') as f: + return json.load(f) + return {} + except (IOError, json.JSONDecodeError): + return {} + + def _save_config(self): + """ + Save current nonce configuration to file. + """ + try: + with open(self.config_path, 'w') as f: + json.dump(self.nonce_config, f, indent=2) + except IOError as e: + raise RuntimeError(f"Failed to save nonce configuration: {e}") + + def generate_nonce(self, identifier: str, length: int = 32) -> str: + """ + Generate a secure nonce for a given identifier. + + :param identifier: Unique identifier for the nonce + :param length: Length of the nonce in bytes (default: 32) + :return: Generated nonce as a hex string + """ + if not identifier: + raise ValueError("Identifier cannot be empty") + + # Ensure minimum length of 16 bytes + length = max(16, length) + + # Generate a cryptographically secure random nonce + nonce = secrets.token_hex(length) + + # Truncate or pad to ensure consistent length + nonce = nonce[:length * 2] + + # Store the nonce with a timestamp hash + self.nonce_config[identifier] = hashlib.sha256(nonce.encode()).hexdigest() + self._save_config() + + return nonce + + def validate_nonce(self, identifier: str, nonce: str) -> bool: + """ + Validate a nonce for a given identifier. + + :param identifier: Unique identifier for the nonce + :param nonce: Nonce to validate + :return: True if nonce is valid, False otherwise + """ + if not identifier or not nonce: + return False + + stored_hash = self.nonce_config.get(identifier) + if not stored_hash: + return False + + # Validate by comparing hash of input nonce + input_hash = hashlib.sha256(nonce.encode()).hexdigest() + return input_hash == stored_hash + + def clear_nonce(self, identifier: str): + """ + Clear a nonce for a given identifier. + + :param identifier: Unique identifier for the nonce + """ + if identifier in self.nonce_config: + del self.nonce_config[identifier] + self._save_config() \ No newline at end of file diff --git a/agent-framework/tests/unit/test_nonce_config.py b/agent-framework/tests/unit/test_nonce_config.py new file mode 100644 index 00000000..043b3fd4 --- /dev/null +++ b/agent-framework/tests/unit/test_nonce_config.py @@ -0,0 +1,107 @@ +import os +import tempfile +import pytest +from prometheus_swarm.utils.nonce_config import NonceConfigManager + +def test_nonce_generation_and_validation(): + """ + Test nonce generation and validation functionality. + """ + # Create a temporary config file + with tempfile.NamedTemporaryFile(delete=False) as temp_config: + temp_config_path = temp_config.name + + try: + # Initialize NonceConfigManager with temporary config + nonce_manager = NonceConfigManager(config_path=temp_config_path) + + # Test nonce generation + identifier = "test_user" + nonce = nonce_manager.generate_nonce(identifier) + + # Validate the generated nonce + assert nonce_manager.validate_nonce(identifier, nonce) is True + + # Validate with incorrect nonce + assert nonce_manager.validate_nonce(identifier, "wrong_nonce") is False + finally: + # Clean up the temporary config file + if os.path.exists(temp_config_path): + os.unlink(temp_config_path) + +def test_nonce_validation_edge_cases(): + """ + Test nonce validation edge cases. + """ + # Create a temporary config file + with tempfile.NamedTemporaryFile(delete=False) as temp_config: + temp_config_path = temp_config.name + + try: + nonce_manager = NonceConfigManager(config_path=temp_config_path) + + # Test empty inputs + assert nonce_manager.validate_nonce("", "") is False + assert nonce_manager.validate_nonce("user", "") is False + assert nonce_manager.validate_nonce("", "nonce") is False + + # Test validation of non-existent identifier + assert nonce_manager.validate_nonce("non_existent_user", "some_nonce") is False + finally: + # Clean up the temporary config file + if os.path.exists(temp_config_path): + os.unlink(temp_config_path) + +def test_nonce_generation_requirements(): + """ + Test nonce generation requirements and constraints. + """ + # Create a temporary config file + with tempfile.NamedTemporaryFile(delete=False) as temp_config: + temp_config_path = temp_config.name + + try: + nonce_manager = NonceConfigManager(config_path=temp_config_path) + + # Test invalid nonce generation + with pytest.raises(ValueError, match="Identifier cannot be empty"): + nonce_manager.generate_nonce("") + + # Test nonce generation with different lengths + nonce_short = nonce_manager.generate_nonce("user_short", length=16) + nonce_long = nonce_manager.generate_nonce("user_long", length=64) + + assert len(nonce_short) == 32 # Hex representation of 16 bytes + assert len(nonce_long) == 128 # Hex representation of 64 bytes + finally: + # Clean up the temporary config file + if os.path.exists(temp_config_path): + os.unlink(temp_config_path) + +def test_nonce_clear_functionality(): + """ + Test nonce clearing functionality. + """ + # Create a temporary config file + with tempfile.NamedTemporaryFile(delete=False) as temp_config: + temp_config_path = temp_config.name + + try: + nonce_manager = NonceConfigManager(config_path=temp_config_path) + + # Generate a nonce + identifier = "test_clear_user" + nonce = nonce_manager.generate_nonce(identifier) + + # Validate nonce before clearing + assert nonce_manager.validate_nonce(identifier, nonce) is True + + # Clear the nonce + nonce_manager.clear_nonce(identifier) + + # Validate that the nonce is no longer valid + assert nonce_manager.validate_nonce(identifier, nonce) is False + finally: + # Clean up the temporary config file + if os.path.exists(temp_config_path): + os.unlink(temp_config_path) \ No newline at end of file