diff --git a/robustness_experiment_box/database/epsilon_value_result.py b/robustness_experiment_box/database/epsilon_value_result.py index 616853a3..69da928b 100644 --- a/robustness_experiment_box/database/epsilon_value_result.py +++ b/robustness_experiment_box/database/epsilon_value_result.py @@ -7,6 +7,14 @@ class EpsilonValueResult: """ A dataclass defining the verification result of a single verification. + + This class is designed for traditional deterministic verification approaches + where we search for specific epsilon values. For probabilistic certification + (like randomized smoothing), use ProbabilisticCertificationResult directly. + + For traditional verification: + - epsilon: The certified epsilon value + - smallest_sat_value: The smallest epsilon where SAT was found """ verification_context: VerificationContext @@ -22,11 +30,10 @@ def to_dict(self) -> dict: Returns: dict: The dictionary representation of the EpsilonValueResult. """ - ret = dict( + return dict( **self.verification_context.get_dict_for_epsilon_result(), epsilon_value=self.epsilon, smallest_sat_value=self.smallest_sat_value, total_time=self.time, verifier=self.verifier, ) - return ret diff --git a/robustness_experiment_box/database/verification_result.py b/robustness_experiment_box/database/verification_result.py index 30e61103..585151a2 100644 --- a/robustness_experiment_box/database/verification_result.py +++ b/robustness_experiment_box/database/verification_result.py @@ -1,3 +1,4 @@ +from dataclasses import dataclass from enum import Enum @@ -10,3 +11,72 @@ class VerificationResult(str, Enum): SAT = "SAT" TIMEOUT = "TIMEOUT" ERROR = "ERR" + + +@dataclass +class ProbabilisticCertificationResult: + """ + Result class for probabilistic certification methods like randomized smoothing. + + This provides certified robustness guarantees obtained through Monte Carlo sampling + with Gaussian noise, following the approach of Cohen et al. (2019). + + Note: Unlike traditional verification, probabilistic certification provides + a certified radius rather than binary SAT/UNSAT/TIMEOUT/ERROR results. The certified radius + may be 0 if confidence is insufficient (ABSTAIN in Cohen et al.). + + Attributes: + predicted_class: The class predicted by the smoothed classifier + certified_radius: The L2 radius within which the prediction is guaranteed to be constant + confidence: The statistical confidence level (1 - alpha) + n0: Number of samples used for initial prediction + n: Number of samples used for certification + sigma: Noise level used for smoothing + certification_time: Time taken for the certification process + """ + + predicted_class: int + certified_radius: float + confidence: float # 1 - alpha + n0: int + n: int + sigma: float + certification_time: float + + def to_dict(self) -> dict: + """ + Convert the probabilistic certification result to a dictionary. + + Returns: + Dictionary representation of the result + """ + return { + "predicted_class": self.predicted_class, + "certified_radius": self.certified_radius, + "confidence": self.confidence, + "n0": self.n0, + "n": self.n, + "sigma": self.sigma, + "certification_time": self.certification_time, + } + + @classmethod + def from_dict(cls, data: dict) -> "ProbabilisticCertificationResult": + """ + Create a ProbabilisticCertificationResult from a dictionary. + + Args: + data: Dictionary containing the result data + + Returns: + ProbabilisticCertificationResult instance + """ + return cls( + predicted_class=data["predicted_class"], + certified_radius=data["certified_radius"], + confidence=data["confidence"], + n0=data["n0"], + n=data["n"], + sigma=data["sigma"], + certification_time=data["certification_time"], + ) diff --git a/robustness_experiment_box/epsilon_value_estimator/randomized_smoothing_estimator.py b/robustness_experiment_box/epsilon_value_estimator/randomized_smoothing_estimator.py new file mode 100644 index 00000000..d341818a --- /dev/null +++ b/robustness_experiment_box/epsilon_value_estimator/randomized_smoothing_estimator.py @@ -0,0 +1,117 @@ +""" +Randomized Smoothing Estimator for VERONA. + +This estimator provides direct computation of certified radii using randomized smoothing, +eliminating the need for binary search over epsilon values. It works with the +RandomizedSmoothingModule to provide probabilistic robustness guarantees. + +This approach is more efficient than binary search for probabilistic certification +since it directly computes the certified radius rather than searching for boundaries. +""" + +import logging + +from robustness_experiment_box.database.epsilon_value_result import EpsilonValueResult +from robustness_experiment_box.database.verification_context import VerificationContext +from robustness_experiment_box.database.verification_result import ProbabilisticCertificationResult +from robustness_experiment_box.epsilon_value_estimator.epsilon_value_estimator import EpsilonValueEstimator +from robustness_experiment_box.verification_module.randomized_smoothing_module import RandomizedSmoothingModule + +logger = logging.getLogger(__name__) + + +class RandomizedSmoothingEstimator(EpsilonValueEstimator): + """ + Estimator for probabilistic robustness using randomized smoothing. + + This estimator directly computes certified radii using Monte Carlo sampling + rather than performing binary search over epsilon values. It is + designed to work with the RandomizedSmoothingModule. + + Args: + smoothing_module: The randomized smoothing verification module + n0: Number of samples for initial prediction + n: Number of samples for certification + alpha: Confidence level (failure probability) + batch_size: Batch size for sampling + """ + + def __init__( + self, + smoothing_module: RandomizedSmoothingModule, + n0: int = 100, + n: int = 100000, + alpha: float = 0.001, + batch_size: int = 1000 + ): + # Pass empty epsilon list since we don't use binary search + super().__init__([], smoothing_module) + + self.smoothing_module = smoothing_module + self.n0 = n0 + self.n = n + self.alpha = alpha + self.batch_size = batch_size + + def compute_epsilon_value(self, verification_context: VerificationContext) -> EpsilonValueResult: + """ + Compute the certified epsilon value using randomized smoothing. + + This method performs direct computation of the certified radius. + + Args: + verification_context: The verification context + + Returns: + EpsilonValueResult with the certified radius as epsilon value + """ + # Perform probabilistic verification + probabilistic_result = self.smoothing_module.verify_probabilistic( + verification_context=verification_context, + n0=self.n0, + n=self.n, + alpha=self.alpha, + sigma=self.smoothing_module.sigma, + batch_size=self.batch_size + ) + + # Convert to traditional EpsilonValueResult format + # Use certified_radius as the epsilon value for compatibility + epsilon_value_result = EpsilonValueResult( + verification_context=verification_context, + epsilon=probabilistic_result.certified_radius, + smallest_sat_value=probabilistic_result.certified_radius, + time=probabilistic_result.certification_time, + verifier=self.smoothing_module.name + ) + + logger.info( + f"Randomized smoothing result: predicted_class={probabilistic_result.predicted_class}, " + f"certified_radius={probabilistic_result.certified_radius:.4f}, " + f"confidence={probabilistic_result.confidence:.4f}, " + f"time={probabilistic_result.certification_time:.2f}s" + ) + + return epsilon_value_result + + def get_probabilistic_result(self, verification_context: VerificationContext) -> ProbabilisticCertificationResult: + """ + Get the full probabilistic certification result. + + This method provides access to the complete probabilistic result + including predicted class and confidence information. + + Args: + verification_context: The verification context + + Returns: + Probabilistic certification result + """ + return self.smoothing_module.verify_probabilistic( + verification_context=verification_context, + n0=self.n0, + n=self.n, + alpha=self.alpha, + sigma=self.smoothing_module.sigma, + batch_size=self.batch_size + ) diff --git a/robustness_experiment_box/verification_module/randomized_smoothing_module.py b/robustness_experiment_box/verification_module/randomized_smoothing_module.py new file mode 100644 index 00000000..c38f661d --- /dev/null +++ b/robustness_experiment_box/verification_module/randomized_smoothing_module.py @@ -0,0 +1,279 @@ +""" +Randomized Smoothing Module for VERONA. + +This module implements probabilistic verification using Monte Carlo sampling +with Gaussian noise, following the approach described in: + +Cohen, J., Rosenfeld, E., and Kolter, Z. (2019). Certified Adversarial Robustness +via Randomized Smoothing. In Proceedings of the 36th International Conference +on Machine Learning. + +The module supports both standard randomized smoothing and diffusion denoised +smoothing extensions. +""" + +import time +from typing import Union + +import numpy as np +import torch +from scipy.stats import norm +from statsmodels.stats.proportion import proportion_confint + +from robustness_experiment_box.database.verification_context import VerificationContext +from robustness_experiment_box.database.verification_result import ProbabilisticCertificationResult +from robustness_experiment_box.verification_module.verification_module import VerificationModule + + +class RandomizedSmoothingModule(VerificationModule): + """ + Verification module implementing randomized smoothing for probabilistic robustness certification. + + This module uses Monte Carlo sampling with Gaussian noise to provide probabilistic + guarantees about the robustness of neural network predictions. It supports both + standard randomized smoothing and diffusion denoised smoothing variants. + + The base classifier is loaded from the verification context's network, following + VERONA's architecture where ExperimentRepository manages networks. + + Args: + num_classes: Number of output classes + sigma: Standard deviation of Gaussian noise for smoothing + diffusion_model: Optional diffusion model for denoising (Carlini et al. approach) + diffusion_timestep: Optional timestep for diffusion denoising + """ + + def __init__( + self, + num_classes: int, + sigma: float, + diffusion_model: torch.nn.Module | None = None, + diffusion_timestep: int | None = None + ): + self.num_classes = num_classes + self.sigma = sigma + self.diffusion_model = diffusion_model + self.diffusion_timestep = diffusion_timestep + self.name = f"RandomizedSmoothingModule(sigma={sigma})" + + def verify( + self, + verification_context: VerificationContext, + epsilon: float + ) -> Union[str, "ProbabilisticCertificationResult"]: + """ + Perform traditional verification at a fixed epsilon value. + + For randomized smoothing, this method performs sampling-based certification + and returns a ProbabilisticCertificationResult. + + Args: + verification_context: The verification context + epsilon: The perturbation magnitude (used as sigma for smoothing) + + Returns: + ProbabilisticCertificationResult with certified radius + """ + # Use epsilon as sigma for smoothing if not specified + sigma = epsilon if epsilon > 0 else self.sigma + return self._perform_randomized_smoothing(verification_context, sigma) + + def verify_probabilistic( + self, + verification_context: VerificationContext, + n0: int, + n: int, + alpha: float, + sigma: float, + batch_size: int = 1000, + diffusion_timestep: int | None = None + ) -> ProbabilisticCertificationResult: + """ + Perform probabilistic certification with specified parameters. + + Args: + verification_context: The verification context + n0: Number of samples for initial prediction + n: Number of samples for certification + alpha: Confidence level (failure probability) + sigma: Noise level for smoothing + batch_size: Batch size for sampling + diffusion_timestep: Optional timestep for diffusion denoising + + Returns: + ProbabilisticCertificationResult with predicted class and certified radius + """ + return self._perform_randomized_smoothing( + verification_context, + sigma, + n0=n0, + n=n, + alpha=alpha, + batch_size=batch_size, + diffusion_timestep=diffusion_timestep + ) + + def _perform_randomized_smoothing( + self, + verification_context: VerificationContext, + sigma: float, + n0: int = 100, + n: int = 100000, + alpha: float = 0.001, + batch_size: int = 1000, + diffusion_timestep: int | None = None + ) -> ProbabilisticCertificationResult: + """ + Core implementation of randomized smoothing certification algorithm. + + This follows the certified radius computation from Cohen et al.: + 1. Sample n0 predictions to get the predicted class cA + 2. Sample n predictions to estimate the lower bound on pA + 3. Compute certified radius R = sigma * norm.ppf(pABar) + + Args: + verification_context: The verification context + sigma: Noise standard deviation + n0: Number of samples for prediction + n: Number of samples for certification + alpha: Confidence level + batch_size: Batch size for sampling + diffusion_timestep: Optional diffusion timestep + + Returns: + ProbabilisticCertificationResult with certification + """ + # Load the base classifier from the verification context's network + base_classifier = verification_context.network.load_pytorch_model() + base_classifier.eval() + + if self.diffusion_model is not None: + self.diffusion_model.eval() + + start_time = time.time() + + # Step 1: Get initial prediction with n0 samples + counts_selection = self._sample_noise(verification_context, n0, batch_size, base_classifier) + predicted_class = int(counts_selection.argmax()) + + # Step 2: Estimate pA with n samples + counts_estimation = self._sample_noise(verification_context, n, batch_size, base_classifier) + nA = counts_estimation[predicted_class] + + # Step 3: Compute lower confidence bound on pA + pABar = self._lower_confidence_bound(nA, n, alpha) + + # Step 4: Compute certified radius + # Note: Probabilistic certification always produces a radius (may be 0) + # We don't map to traditional SAT/UNSAT since the semantics are different + certified_radius = max(0.0, sigma * norm.ppf(pABar)) + + verification_time = time.time() - start_time + + return ProbabilisticCertificationResult( + predicted_class=predicted_class, + certified_radius=certified_radius, + confidence=1.0 - alpha, + n0=n0, + n=n, + sigma=sigma, + certification_time=verification_time + ) + + def _sample_noise( + self, + verification_context: VerificationContext, + num_samples: int, + batch_size: int, + base_classifier: torch.nn.Module + ) -> np.ndarray: + """ + Sample the base classifier's prediction under noisy corruptions. + + Args: + verification_context: The verification context + num_samples: Number of samples to collect + batch_size: Batch size for processing + base_classifier: The PyTorch model to use for predictions + + Returns: + Array of class counts for the num_samples predictions + """ + with torch.no_grad(): + counts = np.zeros(self.num_classes, dtype=int) + + for _ in range(int(np.ceil(num_samples / batch_size))): + this_batch_size = min(batch_size, num_samples) + num_samples -= this_batch_size + + # Prepare batch of identical inputs + x_batch = verification_context.data_point.data.repeat(this_batch_size, 1, 1, 1) + + # Add Gaussian noise + noise = torch.randn_like(x_batch) * self.sigma + + # Apply optional diffusion denoising + if self.diffusion_model is not None and self.diffusion_timestep is not None: + x_noisy = x_batch + noise + # Apply diffusion denoising step + x_denoised = self._apply_diffusion_denoising(x_noisy, self.diffusion_timestep) + predictions = base_classifier(x_denoised).argmax(1) + else: + # Standard randomized smoothing + predictions = base_classifier(x_batch + noise).argmax(1) + + counts += self._count_arr(predictions.cpu().numpy(), self.num_classes) + + return counts + + def _apply_diffusion_denoising(self, x_noisy: torch.Tensor, timestep: int) -> torch.Tensor: + """ + Apply diffusion denoising at a specific timestep. + + This is a simplified implementation - in practice, this would use + the full diffusion model denoising process from Carlini et al. + + Args: + x_noisy: Noisy input tensor + timestep: Diffusion timestep for denoising + + Returns: + Denoised tensor + """ + # Placeholder implementation - in practice this would use the actual + # diffusion model to perform denoising at the specified timestep + if self.diffusion_model is not None: + # This would call the diffusion model's denoising step + # For now, we return the noisy input as-is + return x_noisy + return x_noisy + + def _count_arr(self, arr: np.ndarray, length: int) -> np.ndarray: + """ + Count occurrences of each class in prediction array. + + Args: + arr: Array of class predictions + length: Expected number of classes + + Returns: + Array of counts for each class + """ + counts = np.zeros(length, dtype=int) + for idx in arr: + counts[idx] += 1 + return counts + + def _lower_confidence_bound(self, nA: int, N: int, alpha: float) -> float: + """ + Compute lower confidence bound on binomial proportion using Clopper-Pearson method. + + Args: + nA: Number of "successes" (predictions of class A) + N: Total number of samples + alpha: Confidence level + + Returns: + Lower bound on the proportion + """ + return proportion_confint(nA, N, alpha=2 * alpha, method="beta")[0] diff --git a/robustness_experiment_box/verification_module/verification_module.py b/robustness_experiment_box/verification_module/verification_module.py index 72420d2b..2771ab07 100644 --- a/robustness_experiment_box/verification_module/verification_module.py +++ b/robustness_experiment_box/verification_module/verification_module.py @@ -3,10 +3,63 @@ from autoverify.verifier.verification_result import CompleteVerificationData from robustness_experiment_box.database.verification_context import VerificationContext +from robustness_experiment_box.database.verification_result import ProbabilisticCertificationResult class VerificationModule(ABC): + """ + Abstract base class for all verification modules. + + This interface supports both traditional deterministic verification and + probabilistic verification methods like randomized smoothing. + """ + @abstractmethod def verify(self, verification_context: VerificationContext, epsilon: float) -> str | CompleteVerificationData: - """Main method to verify an image for a given network and epsilon value""" + """ + Main method to verify an image for a given network and epsilon value. + + For traditional verification, this performs deterministic verification. + For probabilistic methods, this may perform Monte Carlo sampling. + + Args: + verification_context: The context containing network, data point, and property generator + epsilon: The perturbation magnitude to verify + + Returns: + Either a string result ("SAT"/"UNSAT") or CompleteVerificationData object + """ raise NotImplementedError + + def verify_probabilistic( + self, + verification_context: VerificationContext, + n0: int, + n: int, + alpha: float, + sigma: float, + batch_size: int = 1000, + diffusion_timestep: int | None = None + ) -> "ProbabilisticCertificationResult": + """ + Perform probabilistic certification using Monte Carlo sampling. + + This method supports randomized smoothing and diffusion denoised smoothing approaches. + Default implementation raises NotImplementedError - subclasses should override. + + Args: + verification_context: The context containing network, data point, and property generator + n0: Number of samples for initial prediction + n: Number of samples for certification + alpha: Confidence level (failure probability) + sigma: Noise level for randomized smoothing + batch_size: Batch size for sampling + diffusion_timestep: Optional timestep for diffusion denoising + + Returns: + ProbabilisticCertificationResult with predicted class and certified radius + + Raises: + NotImplementedError: If subclass doesn't support probabilistic certification + """ + raise NotImplementedError("Probabilistic certification not implemented in this module") diff --git a/scripts/randomized_smoothing_example.py b/scripts/randomized_smoothing_example.py new file mode 100644 index 00000000..e2c6c837 --- /dev/null +++ b/scripts/randomized_smoothing_example.py @@ -0,0 +1,150 @@ +""" +Example usage of Randomized Smoothing with VERONA. + +This example demonstrates how to use probabilistic certification. + +Usage: + python randomized_smoothing_example.py +""" + +import logging +from pathlib import Path + +import torchvision +import torchvision.transforms as transforms + +from robustness_experiment_box.database.dataset.pytorch_experiment_dataset import PytorchExperimentDataset +from robustness_experiment_box.database.experiment_repository import ExperimentRepository +from robustness_experiment_box.dataset_sampler.predictions_based_sampler import PredictionsBasedSampler +from robustness_experiment_box.epsilon_value_estimator.randomized_smoothing_estimator import ( + RandomizedSmoothingEstimator, +) +from robustness_experiment_box.verification_module.property_generator.one2any_property_generator import ( + One2AnyPropertyGenerator, +) +from robustness_experiment_box.verification_module.randomized_smoothing_module import RandomizedSmoothingModule + +# Configure logging +logging.basicConfig(format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO) +logger = logging.getLogger(__name__) + +# Configuration - matches existing script structure +experiment_name = "randomized_smoothing_example" +experiment_repository_path = Path("../tests/test_experiment") +network_folder = Path("../tests/test_experiment/data/networks") + +# Certification parameters +sigma = 0.25 # Noise level for smoothing +n0 = 100 # Samples for prediction +n = 10000 # Samples for certification +alpha = 0.001 # Confidence level +batch_size = 1000 + +def main(): + """Demonstrate randomized smoothing certification following VERONA patterns.""" + + # Load dataset (same as existing scripts) + logger.info("Loading MNIST dataset...") + dataset = torchvision.datasets.MNIST( + root="./data", train=False, download=True, + transform=transforms.ToTensor() + ) + experiment_dataset = PytorchExperimentDataset(dataset) + + # Number of classes for MNIST + num_classes = 10 + + # Create randomized smoothing module (replaces formal verifier) + smoothing_module = RandomizedSmoothingModule( + num_classes=num_classes, + sigma=sigma + ) + + # Create estimator (replaces binary search estimator) + estimator = RandomizedSmoothingEstimator( + smoothing_module=smoothing_module, + n0=n0, + n=n, + alpha=alpha, + batch_size=batch_size + ) + + # Set up experiment repository (same as existing scripts) + experiment_repository = ExperimentRepository( + base_path=experiment_repository_path, + network_folder=network_folder + ) + + # Initialize experiment (same as existing scripts) + experiment_repository.initialize_new_experiment(experiment_name) + + # Save configuration (same as existing scripts) + experiment_repository.save_configuration( + dict( + experiment_name=experiment_name, + experiment_repository_path=str(experiment_repository_path), + network_folder=str(network_folder), + dataset=str(experiment_dataset), + sigma=sigma, + n0=n0, + n=n, + alpha=alpha, + batch_size=batch_size, + ) + ) + + # Create property generator (same as existing scripts) + property_generator = One2AnyPropertyGenerator() + + # Create sampler (same as existing scripts) + dataset_sampler = PredictionsBasedSampler(sample_correct_predictions=True) + + # Get network list (same as existing scripts) + network_list = experiment_repository.get_network_list() + logger.info(f"Found {len(network_list)} networks") + + # Main processing loop (same pattern as existing scripts) + for network in network_list: + logger.info(f"Processing network: {network.path}") + + # Sample data (same as existing scripts) + sampled_data = dataset_sampler.sample(network, experiment_dataset) + logger.info(f"Processing {len(sampled_data)} data points...") + + for data_point in sampled_data: + logger.info(f" Processing image ID: {data_point.id}, Label: {data_point.label}") + + # Create verification context (same as existing scripts) + verification_context = experiment_repository.create_verification_context( + network, data_point, property_generator + ) + + # Compute certified radius (same method call as existing scripts) + epsilon_value_result = estimator.compute_epsilon_value(verification_context) + + # Save result (same as existing scripts) + experiment_repository.save_result(epsilon_value_result) + + # Show detailed probabilistic result + probabilistic_result = estimator.get_probabilistic_result(verification_context) + + logger.info(" Results:") + logger.info(f" Certified radius: {epsilon_value_result.epsilon:.4f}") + logger.info(f" Predicted class: {probabilistic_result.predicted_class}") + logger.info(f" Statistical confidence: {probabilistic_result.confidence:.4f}") + logger.info(f" Samples used: n0={probabilistic_result.n0}, n={probabilistic_result.n}") + logger.info(f" Certification time: {probabilistic_result.certification_time:.2f}s") + + # Save plots and finalize (same as existing scripts) + experiment_repository.save_plots() + + logger.info("\n" + "="*60) + logger.info("RANDOMIZED SMOOTHING CERTIFICATION COMPLETE") + logger.info("="*60) + logger.info(f"Experiment: {experiment_name}") + logger.info(f"Networks processed: {len(network_list)}") + logger.info(f"Total certified radii computed: {len(sampled_data) * len(network_list)}") + logger.info("="*60) + +if __name__ == "__main__": + main()