Source code for epydemix.calibration.abc

import copy
from datetime import datetime, timedelta
from typing import Any, Callable, Dict, List, Optional

import numpy as np
import pandas as pd

from ..utils.abc_smc_utils import (
    DefaultPerturbationContinuous,
    DefaultPerturbationDiscrete,
    sample_prior,
)
from .calibration_results import CalibrationResults
from .metrics import rmse


[docs] class ABCSampler: """ Approximate Bayesian Computation (ABC) class implementing different ABC strategies. """ def __init__( self, simulation_function: Callable, priors: Dict[str, Any], parameters: Dict[str, Any], observed_data: Any, distance_function: Callable = rmse, ): """Initialize ABC calibration.""" self.simulation_function = simulation_function self.priors = priors self.parameters = parameters.copy() self.observed_data = {"data": observed_data} self.distance_function = distance_function self.param_names = list(priors.keys()) self.results = None # Separate continuous and discrete parameters self.continuous_params = [ name for name in self.param_names if hasattr(priors[name], "pdf") ] self.discrete_params = [ name for name in self.param_names if name not in self.continuous_params ]
[docs] def calibrate(self, strategy: str = "smc", **kwargs) -> CalibrationResults: """Run calibration using the specified strategy. This function allows the user to run Approximate Bayesian Computation (ABC) calibration using one of the available strategies: Sequential Monte Carlo (SMC), rejection sampling, or top fraction selection. The appropriate method is selected based on the `strategy` argument, and additional keyword arguments (`**kwargs`) are passed to the corresponding function. ### Available Strategies: - `"smc"`: Uses Sequential Monte Carlo (ABC-SMC) for calibration. - `"rejection"`: Uses ABC rejection sampling. - `"top_fraction"`: Selects the best fraction of simulated results. ### Arguments: - **strategy** (`str`, default: `"smc"`): Specifies the calibration strategy. Must be one of `{"smc", "rejection", "top_fraction"}`. - **kwargs**: Additional parameters depending on the chosen strategy. ### Strategy-Specific Arguments: #### `"smc"` (Sequential Monte Carlo) - `num_particles` (`int`, default: `1000`): Number of particles (samples) per generation. - `num_generations` (`int`, default: `10`): Number of generations for the ABC-SMC process. - `epsilon_schedule` (`Optional[List[float]]`, default: `None`): Predefined schedule for epsilon values. - `epsilon_quantile_level` (`float`, default: `0.5`): Quantile level to adapt epsilon if no schedule is provided. - `minimum_epsilon` (`Optional[float]`, default: `None`): Minimum allowable epsilon value. - `max_time` (`Optional[timedelta]`, default: `None`): Maximum allowed runtime. - `total_simulations_budget` (`Optional[int]`, default: `None`): Maximum number of allowed simulations. - `perturbations` (`Optional[Dict[str, Any]]`, default: `None`): Perturbation kernels for parameters. - `verbose` (`bool`, default: `True`): Whether to print progress updates. #### `"rejection"` (ABC Rejection Sampling) - `epsilon` (`float`, default: `0.1`): Distance threshold for accepting samples. - `num_particles` (`int`, default: `1000`): Number of accepted samples. - `max_time` (`Optional[timedelta]`, default: `None`): Maximum allowed runtime. - `total_simulations_budget` (`Optional[int]`, default: `None`): Maximum number of allowed simulations. - `verbose` (`bool`, default: `True`): Whether to print progress updates. - `progress_update_interval` (`int`, default: `1000`): Interval at which progress updates are printed. #### `"top_fraction"` (ABC Top-Fraction Selection) - `top_fraction` (`float`, default: `0.05`): Fraction of best-fitting simulations to keep. - `Nsim` (`int`, default: `100`): Total number of simulations to run. - `verbose` (`bool`, default: `True`): Whether to print progress updates. ### Returns: - `CalibrationResults`: A deep copy of the results from the chosen calibration strategy. ### Raises: - `ValueError`: If an unknown strategy is specified. Example Usage: ```python # Run SMC calibration with custom parameters results = model.calibrate(strategy="smc", num_particles=500, num_generations=15) # Run rejection sampling with a different epsilon threshold results = model.calibrate(strategy="rejection", epsilon=0.05, num_particles=2000) # Run top fraction selection with a different fraction results = model.calibrate(strategy="top_fraction", top_fraction=0.1, Nsim=500) ``` """ strategies = { "smc": self.run_smc, "rejection": self.run_rejection, "top_fraction": self.run_top_fraction, } if strategy not in strategies: raise ValueError( f"Unknown strategy: {strategy}. Must be one of {list(strategies.keys())}" ) self.results = strategies[strategy](**kwargs) return copy.deepcopy(self.results)
[docs] def run_smc( self, num_particles: int = 1000, num_generations: int = 10, epsilon_schedule: Optional[List[float]] = None, epsilon_quantile_level: float = 0.5, minimum_epsilon: Optional[float] = None, max_time: Optional[timedelta] = None, total_simulations_budget: Optional[int] = None, perturbations: Optional[Dict[str, Any]] = None, verbose: bool = True, ) -> CalibrationResults: """Run ABC-SMC calibration.""" # Initialize perturbations if not provided if perturbations is None: perturbations = { param: ( DefaultPerturbationContinuous(param) if param in self.continuous_params else DefaultPerturbationDiscrete(param, self.priors[param]) ) for param in self.param_names } if verbose: print( f"Starting ABC-SMC with {num_particles} particles and {num_generations} generations" ) # Run generations start_time = datetime.now() # _initialize_particles and _run_smc_generation receive n_simulations from # previous generation and return the updated (accumulated) total. n_simulations = 0 results = None for gen in range(num_generations): start_generation_time = datetime.now() if gen == 0: epsilon = ( epsilon_schedule[0] if epsilon_schedule is not None else float("inf") ) if verbose: print( f"\nGeneration {gen + 1}/{num_generations} (epsilon: {epsilon:.6f})" ) # Initialize particles, weights, distances, simulations new_gen = self._initialize_particles( num_particles, epsilon, start_time, max_time, total_simulations_budget, n_simulations, ) if new_gen is None: if verbose: print("Maximum time or budget reached during generation 0") break n_simulations = new_gen["n_simulations"] # Store results for generation 0 results = self._create_results( "smc", pd.DataFrame( data={ self.param_names[i]: new_gen["particles"][:, i] for i in range(len(self.param_names)) } ), new_gen["weights"], new_gen["distances"], new_gen["simulations"], ) particles = new_gen["particles"] weights = new_gen["weights"] distances = new_gen["distances"] else: # Compute epsilon for this generation epsilon = ( epsilon_schedule[gen] if epsilon_schedule is not None else np.quantile(distances, epsilon_quantile_level) ) if verbose: print( f"\nGeneration {gen + 1}/{num_generations} (epsilon: {epsilon:.6f})" ) # Update perturbations for perturbation in perturbations.values(): perturbation.update(particles, weights, self.param_names) # Run generation new_gen = self._run_smc_generation( particles, weights, epsilon, num_particles, perturbations, start_time, max_time, total_simulations_budget, n_simulations, ) if new_gen is None: if verbose: print( f"Maximum time or budget reached during generation {gen + 1}, keeping last complete generation" ) break n_simulations = new_gen["n_simulations"] # Store results results.posterior_distributions[gen] = pd.DataFrame( data={ self.param_names[i]: new_gen["particles"][:, i] for i in range(len(self.param_names)) } ) results.distances[gen] = new_gen["distances"] results.weights[gen] = new_gen["weights"] results.selected_trajectories[gen] = new_gen["simulations"] # Update current generation particles = new_gen["particles"] weights = new_gen["weights"] distances = new_gen["distances"] if verbose: # Print generation information end_generation_time = datetime.now() elapsed_time = end_generation_time - start_generation_time formatted_time = f"{elapsed_time.seconds // 3600:02}:{(elapsed_time.seconds % 3600) // 60:02}:{elapsed_time.seconds % 60:02}" acceptance_rate = ( len(new_gen["particles"]) / new_gen["n_simulations"] * 100 ) print( f"\tAccepted {len(new_gen['particles'])}/{new_gen['n_simulations']} (acceptance rate: {acceptance_rate:.2f}%)" ) print(f"\tElapsed time: {formatted_time}") # Check stopping conditions between generations if self._check_stopping_conditions( epsilon, minimum_epsilon, start_time, max_time, n_simulations, total_simulations_budget, ): break # If generation 0 was interrupted before completing, return empty results if results is None: results = CalibrationResults( calibration_strategy="smc", observed_data=self.observed_data, priors=self.priors, ) return results
[docs] def run_rejection( self, epsilon: float = 0.1, num_particles: int = 1000, max_time: Optional[timedelta] = None, total_simulations_budget: Optional[int] = None, verbose: bool = True, progress_update_interval: int = 1000, ) -> CalibrationResults: """Run ABC rejection sampling.""" simulations, distances = [], [] sampled_params = {p: [] for p in self.param_names} start_time = datetime.now() n_simulations = 0 last_print = 0 # For tracking progress updates if verbose: print( f"Starting ABC rejection sampling with {num_particles} particles and epsilon threshold {epsilon}" ) while len(distances) < num_particles: # Check stopping conditions if self._check_stopping_conditions( None, None, start_time, max_time, n_simulations, total_simulations_budget, ): break # Sample and simulate params = self._sample_parameters() simulation = self._run_simulation(params) distance = self.distance_function(self.observed_data, simulation) n_simulations += 1 if distance < epsilon: simulations.append(simulation) distances.append(distance) for i, p in enumerate(self.param_names): sampled_params[p].append(params[i]) # Print progress every progress_update_interval simulations if verbose if ( verbose and n_simulations % progress_update_interval == 0 and n_simulations != last_print ): last_print = n_simulations acceptance_rate = len(distances) / n_simulations * 100 print( f"\tSimulations: {n_simulations}, Accepted: {len(distances)}, " f"Acceptance rate: {acceptance_rate:.2f}%" ) if verbose: print( f"\tFinal: {len(distances)} particles accepted from {n_simulations} simulations " f"({len(distances) / n_simulations * 100:.2f}% acceptance rate)" ) return self._create_results( "rejection", pd.DataFrame(sampled_params), np.ones(len(distances)) / len(distances), np.array(distances), simulations, )
[docs] def run_top_fraction( self, top_fraction: float = 0.05, Nsim: int = 100, verbose: bool = True ) -> CalibrationResults: """Run ABC top fraction selection.""" simulations, distances = [], [] sampled_params = {p: [] for p in self.param_names} if verbose: print( f"Starting ABC top fraction selection with {Nsim} simulations and top {top_fraction * 100:.1f}% selected" ) for n in range(Nsim): params = self._sample_parameters() simulation = self._run_simulation(params) distance = self.distance_function(self.observed_data, simulation) simulations.append(simulation) distances.append(distance) for i, p in enumerate(self.param_names): sampled_params[p].append(params[i]) # Print progress every 10% if verbose if verbose and (n + 1) % max(1, Nsim // 10) == 0: print( f"\tProgress: {n + 1}/{Nsim} simulations completed ({(n + 1) / Nsim * 100:.1f}%)" ) # Select top fraction threshold = np.quantile(distances, top_fraction) mask = np.array(distances) <= threshold n_selected = sum(mask) if verbose: print( f"\tSelected {n_selected} particles (top {top_fraction * 100:.1f}%) " f"with distance threshold {threshold:.6f}" ) return self._create_results( "top_fraction", pd.DataFrame(sampled_params)[mask], np.ones(sum(mask)) / sum(mask), np.array(distances)[mask], np.array(simulations)[mask], )
def _run_simulation(self, params: List[float]) -> Dict[str, Any]: """Run a single simulation with given parameters.""" full_params = {**self.parameters, **dict(zip(self.param_names, params))} simulation = self.simulation_function(full_params) return self._validate_simulation(simulation) def _validate_simulation(self, simulation: Any) -> Dict[str, Any]: """Ensure simulation output is in correct format.""" if not isinstance(simulation, dict): raise ValueError( f"Simulation must return dictionary, got {type(simulation)}" ) return simulation def _sample_parameters(self) -> List[float]: """Sample parameters from priors.""" return sample_prior(self.priors, self.param_names) def _create_results( self, strategy: str, particles: pd.DataFrame, weights: np.ndarray, distances: np.ndarray, simulations: List[Dict], ) -> CalibrationResults: """Create CalibrationResults object.""" return CalibrationResults( calibration_strategy=strategy, posterior_distributions={0: particles}, selected_trajectories={0: simulations}, distances={0: distances}, weights={0: weights}, observed_data=self.observed_data, priors=self.priors, ) def _check_stopping_conditions( self, epsilon: Optional[float], minimum_epsilon: Optional[float], start_time: datetime, max_time: Optional[timedelta], n_simulations: int, total_simulations_budget: Optional[int], verbose: bool = True, ) -> bool: """Check if any stopping condition is met (epsilon convergence, time limit, or budget). Use verbose=False from per-simulation calls to avoid repeated output. Args: epsilon (float, optional): Current epsilon value minimum_epsilon (float, optional): Minimum allowable epsilon value start_time (datetime): Start time of the calibration run max_time (timedelta, optional): Maximum allowed runtime n_simulations (int): Number of simulations performed so far total_simulations_budget (int, optional): Maximum number of allowed simulations verbose (bool): Whether to print a message when a condition is met Returns: bool: True if any stopping condition is met, False otherwise """ if minimum_epsilon and epsilon and epsilon < minimum_epsilon: if verbose: print("Minimum epsilon reached") return True if max_time and datetime.now() - start_time > max_time: if verbose: print("Maximum time reached") return True if total_simulations_budget and n_simulations > total_simulations_budget: if verbose: print("Total simulations budget reached") return True return False def _initialize_particles( self, num_particles, epsilon, start_time=None, max_time=None, total_simulations_budget=None, n_simulations=0, ): """ Initialize the first generation of particles by sampling from priors. Args: num_particles (int): Number of particles to generate epsilon (float): Epsilon threshold for initial generation start_time (datetime, optional): Start time for time limit checking max_time (timedelta, optional): Maximum allowed runtime total_simulations_budget (int, optional): Maximum number of simulations n_simulations (int): Running count of simulations performed so far Returns: tuple: (particles, weights, distances, simulations) where - particles: numpy array of shape (num_particles, num_parameters) - weights: numpy array of uniform weights - distances: numpy array of distances between simulations and observed data - simulations: list of simulation results Returns None if stopped early by time/budget limits. """ particles, weights, distances, simulations = [], [], [], [] # Sample from priors and run simulations while len(particles) < num_particles: # Check stopping conditions per simulation if self._check_stopping_conditions( None, None, start_time, max_time, n_simulations, total_simulations_budget, verbose=False, ): return None params = sample_prior(self.priors, self.param_names) full_params = {**self.parameters, **dict(zip(self.param_names, params))} simulated_data = self.simulation_function(full_params) dist = self.distance_function( data=self.observed_data, simulation=simulated_data ) n_simulations += 1 if dist <= epsilon: particles.append(params) weights.append(1.0 / num_particles) # Uniform weights initially distances.append(dist) simulations.append(simulated_data) return { "particles": np.array(particles), "weights": np.array(weights), "distances": np.array(distances), "simulations": simulations, "n_simulations": n_simulations, } def _run_smc_generation( self, particles: np.ndarray, weights: np.ndarray, epsilon: float, num_particles: int, perturbations: Dict[str, Any], start_time=None, max_time=None, total_simulations_budget=None, n_simulations=0, ) -> Optional[Dict[str, Any]]: """Run a single generation of ABC-SMC. Returns None if stopped early by time/budget limits. """ new_particles, new_weights, new_distances, new_simulations = [], [], [], [] for _ in range(num_particles): while True: # Check stopping conditions inside inner loop if self._check_stopping_conditions( None, None, start_time, max_time, n_simulations, total_simulations_budget, verbose=False, ): return None # Resample a particle based on weights index = np.random.choice(len(particles), p=weights / weights.sum()) candidate_params = particles[index] # Propose new parameters (perturbation kernel) perturbed_params = [ perturbations[self.param_names[i]].propose(candidate_params[i]) for i in range(len(self.param_names)) ] # Check if perturbed parameters have prior probability > 0 prior_probabilities = [ self.priors[param].pdf(perturbed_params[i]) if param in self.continuous_params else self.priors[param].pmf(perturbed_params[i]) for i, param in enumerate(self.param_names) ] if all(prob > 0 for prob in prior_probabilities): simulation = self._run_simulation(perturbed_params) distance = self.distance_function(self.observed_data, simulation) n_simulations += 1 if distance < epsilon: new_particles.append(perturbed_params) weight_numerator = np.prod( [ self.priors[param].pdf(perturbed_params[i]) if param in self.continuous_params else self.priors[param].pmf(perturbed_params[i]) for i, param in enumerate(self.param_names) ] ) weight_denominator = np.sum( [ weights[j] * np.prod( [ perturbations[self.param_names[i]].pdf( perturbed_params[i], particles[j][i] ) for i in range(len(self.param_names)) ] ) for j in range(len(particles)) ] ) new_weights.append(weight_numerator / weight_denominator) new_distances.append(distance) new_simulations.append(simulation) break # Normalize weights new_weights = np.array(new_weights) new_weights /= new_weights.sum() return { "particles": np.array(new_particles), "weights": np.array(new_weights), "distances": np.array(new_distances), "simulations": new_simulations, "n_simulations": n_simulations, }
[docs] def run_projections( self, parameters: Dict[str, Any], iterations: int = 100, generation: Optional[int] = None, scenario_id: str = "baseline", ) -> CalibrationResults: """ Run projections using parameters sampled from the posterior distribution. Args: parameters: Dictionary of parameters for the projections iterations: Number of projection iterations to run. Default is 100. generation: Which generation to use for posterior. If None, the last generation is used. scenario_id: Identifier for this projection scenario. Default is "baseline". Returns: CalibrationResults: A new CalibrationResults object containing the original results plus the new projections """ # Get posterior distribution and weights from specified generation posterior = self.results.get_posterior_distribution(generation) weights = self.results.get_weights(generation) # Run projections and store results projections, posterior_samples = [], {} for _ in range(iterations): # Sample from posterior according to weights idx = np.random.choice(len(posterior), p=weights / weights.sum()) posterior_sample = posterior.iloc[idx] for k in posterior_sample.keys(): if k not in posterior_samples: posterior_samples[k] = [] posterior_samples[k].append(posterior_sample[k]) # Prepare and run simulation proj_params = parameters.copy() proj_params.update(posterior_sample) result = self.simulation_function(proj_params) projections.append(result) self.results.projections[scenario_id] = projections self.results.projection_parameters[scenario_id] = pd.DataFrame( posterior_samples ) return copy.deepcopy(self.results)