Source code for epydemix.utils.utils

import datetime
import random
import string
from collections.abc import Iterable
from typing import Any, Dict, List, Optional, Union

import numpy as np
import pandas as pd
from evalidate import Expr, base_eval_model

try:
    from numba import njit

    _NUMBA_AVAILABLE = True
except ImportError:
    _NUMBA_AVAILABLE = False

    # Transparent fallback: @njit becomes a no-op decorator
[docs] def njit(*args, **kwargs): def decorator(fn): return fn return decorator if args and callable(args[0]) else decorator
[docs] def is_scalar(value): return np.isscalar(value) and not isinstance(value, (str, bytes))
[docs] def is_iterable(value): return isinstance(value, Iterable) and not isinstance(value, (str, bytes))
[docs] def validate_parameter_shape(key: str, value: Any, T: int, n_age: int) -> None: """ Validates the shape of the input value based on its type and expected dimensions. Args: key (str): The key associated with the value in the parameters dictionary. value (Union[np.ndarray, Iterable]): The value to be validated, which can be a NumPy array or an iterable. T (int): The expected length of the first dimension. n_age (int): The expected length of the second dimension. Raises: ValueError: If the value does not meet the required shape criteria or if the type is unsupported. """ # if isinstance(value, (int, float)): if is_scalar(value): return # Scalars don't need validation elif is_iterable(value): value = np.array(value) if value.ndim == 1: # 1D array if len(value) < T: raise ValueError( f"The length of the 1D iterable for parameter '{key}' is smaller than simulation length ({T})." ) elif value.ndim == 2: # 2D array if value.shape[0] != T and value.shape[0] != 1: raise ValueError( f"The first dimension of the 2D iterable for parameter '{key}' must be 1 or match simulation length ({T})." ) if value.shape[1] != n_age: raise ValueError( f"The second dimension of the 2D iterable for parameter '{key}' must match number of age groups ({n_age})." ) else: raise ValueError( f"Unsupported number of dimensions for parameter '{key}': {value.ndim}" ) else: raise ValueError(f"Unsupported type for parameter '{key}': {type(value)}")
[docs] def resize_parameter(value: Any, T: int, n_age: int) -> np.ndarray: """ Resizes the input value to have the shape (T, n_age). Args: value (Union[np.ndarray, int, float]): The value to be resized, which can be a NumPy array or a scalar. T (int): The length of the first dimension. n_age (int): The length of the second dimension. Returns: np.ndarray: A 2D array with shape (T, n_age). """ if is_scalar(value): # Scalar value return np.full((T, n_age), value) value = np.array(value) if value.ndim == 1: # 1D array return np.tile(value, (n_age, 1)).T elif value.ndim == 2: # 2D array if value.shape[0] == 1: # If the first dimension is 1, repeat it to match T return np.tile(value, (T, 1)) return value
[docs] def create_definitions( parameters: Dict[str, Any], T: int, n_age: int ) -> Dict[str, np.ndarray]: """ Generates a dictionary where each value is a 2D array based on the input dictionary and the parameters provided. Args: parameters (Dict[str, Union[np.ndarray, int, float, Iterable]]): A dictionary where values can be either scalars or iterables. T (int): The length of the first dimension of the arrays to be created. n_age (int): The length of the second dimension of the arrays to be created. Returns: Dict[str, np.ndarray]: A dictionary where keys are the same as in `parameters` and values are 2D arrays of shape `(T, n_age)`. Raises: ValueError: If any parameter value does not meet the required shape criteria. """ definitions = {} for key, value in parameters.items(): validate_parameter_shape(key, value, T, n_age) definitions[key] = resize_parameter(value, T, n_age) return definitions
[docs] def format_simulation_output( compartments_evolution: np.ndarray, transitions_evolution: np.ndarray, compartments_idx: Dict[str, int], transitions_idx: Dict[str, int], demographics: List[str], ) -> Dict[str, np.ndarray]: """ Format simulation output into dictionary with proper naming. Args: compartments_evolution: Array of shape (timesteps, n_compartments, n_demographics) transitions_evolution: Array of shape (timesteps, n_transitions, n_demographics) compartments_idx: Dictionary mapping compartment names to their indices transitions_idx: Dictionary mapping transition names to their indices demographics: List of demographic group names """ formatted_output = {"compartments": {}, "transitions": {}} for comp, pos in compartments_idx.items(): for i, dem in enumerate(demographics): key = f"{comp}_{dem}" formatted_output["compartments"][key] = compartments_evolution[:, pos, i] formatted_output["compartments"][f"{comp}_total"] = np.sum( compartments_evolution[:, pos, :], axis=1 ) for trans, pos in transitions_idx.items(): for i, dem in enumerate(demographics): key = f"{trans}_{dem}" formatted_output["transitions"][key] = transitions_evolution[:, pos, i] formatted_output["transitions"][f"{trans}_total"] = np.sum( transitions_evolution[:, pos, :], axis=1 ) return formatted_output
[docs] def str_to_date(date_str: str) -> datetime.date: """ Converts a date string in the format 'YYYY-MM-DD' to a `datetime.date` object. Args: date_str (str): A string representing a date in the format 'YYYY-MM-DD'. Returns: datetime.date: A `datetime.date` object corresponding to the input string. Raises: ValueError: If the input string is not in the correct format. """ return datetime.datetime.strptime(date_str, "%Y-%m-%d").date()
[docs] def apply_overrides( definitions: Dict[str, np.ndarray], overrides: Dict[str, List[Dict[str, Any]]], dates: List[datetime.date], ) -> Dict[str, np.ndarray]: """ Applies parameter overrides to the definitions based on the specified date ranges. Args: definitions (dict): A dictionary where keys are parameter names and values are 2D arrays representing the parameter values over time and demographics. overrides (dict): A dictionary where keys are parameter names and values are lists of override specifications. Each override specification is a dictionary containing: - 'start_date' (str): The start date of the override period in 'YYYY-MM-DD' format. - 'end_date' (str): The end date of the override period in 'YYYY-MM-DD' format. - 'value' (np.ndarray or scalar): The value to override within the specified date range. dates (list): A list of `datetime.date` objects corresponding to the time steps in the definitions arrays. Returns: dict: A dictionary with the same keys as `definitions`, but with values updated according to the overrides. Raises: ValueError: If the `override` values do not match the expected shape for the specified date ranges. """ if not overrides: return definitions result = definitions.copy() for name, overrides in overrides.items(): if name not in definitions: continue for override in overrides: start_date = pd.Timestamp(override["start_date"]) end_date = pd.Timestamp(override["end_date"]) override_value = override["value"] # Convert dates to pandas timestamps for comparison dates_pd = pd.DatetimeIndex(dates) # Validate override value T = sum((dates_pd >= start_date) & (dates_pd <= end_date)) n_age = definitions[name].shape[1] validate_parameter_shape(name, override_value, T=T, n_age=n_age) # Resize override value override_array = resize_parameter(override_value, T=T, n_age=n_age) # Apply override mask = (dates_pd >= start_date) & (dates_pd <= end_date) result[name][mask] = override_array return result
[docs] def generate_unique_string(length: int = 12) -> str: """ Generates a random unique string containing only letters. Args: length (int, optional): The length of the generated string. Defaults to 12. Returns: str: A random unique string containing both uppercase and lowercase letters. """ letters = string.ascii_letters # Contains both lowercase and uppercase letters return "".join(random.choice(letters) for _ in range(length))
[docs] def compute_days( start_date: Union[str, pd.Timestamp], end_date: Union[str, pd.Timestamp] ) -> int: """ Computes the number of days between two dates. Args: start_date (Union[str, pd.Timestamp]): The start date in string format (e.g., "YYYY-MM-DD") or as a pandas Timestamp. end_date (Union[str, pd.Timestamp]): The end date in string format (e.g., "YYYY-MM-DD") or as a pandas Timestamp. Returns: int: The number of days between the start date and end date, inclusive. """ return pd.date_range(start_date, end_date).shape[0]
[docs] def evaluate(expr: str, env: dict) -> any: """ Evaluates the expression with the given environment, allowing only whitelisted operations. This function extends the base evaluation model to whitelist the 'Mult' (multiplication) and 'Pow' (power) operations, ensuring that only these operations are permitted during the evaluation. Args: expr (str): The expression to evaluate. It is expected to be a string containing the mathematical expression to be evaluated. env (dict): The environment containing variable values. Keys should be variable names and values should be their corresponding numeric values. Returns: any: The result of evaluating the expression. The result type depends on the expression and its evaluation. Raises: EvalException: If there is an error in evaluating the expression, such as an invalid operation or an undefined variable. """ eval_model = base_eval_model eval_model.nodes.extend(["Mult", "Pow"]) return Expr(expr, model=eval_model).eval(env)
[docs] def compute_simulation_dates( start_date: Union[str, datetime.date, np.datetime64], end_date: Union[str, datetime.date, np.datetime64], steps: Optional[int] = None, dt: float = 1.0, ) -> np.ndarray: """ Compute simulation dates including sub-day intervals. Args: start_date: Start date end_date: End date steps: Number of steps (if None, computed from dt) dt: Time step in days (e.g., 1/3 for 8-hour intervals) Returns: numpy array of datetime64 objects for each simulation step """ # Convert inputs to pandas Timestamp for consistent handling if isinstance(start_date, str): start_date = pd.Timestamp(start_date) elif isinstance(start_date, np.datetime64): start_date = pd.Timestamp(start_date) if isinstance(end_date, str): end_date = pd.Timestamp(end_date) elif isinstance(end_date, np.datetime64): end_date = pd.Timestamp(end_date) # Calculate total duration in days total_days = (end_date - start_date).total_seconds() / (24 * 3600) # Calculate number of steps if not provided if steps is None: steps = int(total_days / dt) # Create timestamps with proper sub-day intervals timestamps = pd.date_range( start=start_date, periods=steps + 1, freq=pd.Timedelta(days=dt) ).values return timestamps
[docs] def apply_initial_conditions(epimodel, initial_conditions_dict) -> np.ndarray: """ Applies initial conditions to the compartments of an epidemiological model. Args: epimodel (EpiModel): An instance of an epidemiological model containing compartments and population information. **kwargs: Keyword arguments where each key is a compartment name and each value is an array or list specifying the initial population for that compartment. Returns: np.ndarray: A 2D array where rows correspond to compartments and columns correspond to demographic groups, representing the initial conditions of the model. The shape of the array is `(number_of_compartments, number_of_demographic_groups)`. """ # initialize population in different compartments and demographic groups initial_conditions = np.zeros( (len(epimodel.compartments), len(epimodel.population.Nk)), dtype="int" ) for comp in epimodel.compartments: if comp in initial_conditions_dict: if comp in epimodel.compartments: initial_conditions[epimodel.compartments_idx[comp]] = ( initial_conditions_dict[comp] ) return initial_conditions
[docs] def convert_to_2Darray(lst: List[Any]) -> np.ndarray: """ Converts a list into a 2D NumPy array. Args: lst (List[Any]): A list of elements to be converted into a 2D array. Elements can be of any type. Returns: np.ndarray: A 2D NumPy array with shape `(1, len(lst))`, where `len(lst)` is the length of the input list. """ arr = np.array(lst) arr = arr.reshape(1, len(lst)) return arr
[docs] def combine_simulation_outputs( simulation_outputs_list: List[Dict[str, np.ndarray]], ) -> Dict[str, List[np.ndarray]]: """ Combines multiple simulation outputs into a single dictionary by appending new outputs to existing keys. Args: simulation_outputs_list (List[Dict[str, np.ndarray]]): A list of dictionaries containing the latest simulation output to be combined. Each dictionary contains keys corresponding to compartment-demographic names and values are arrays of simulation results. Returns: Dict[str, List[np.ndarray]]: A dictionary where keys are compartment-demographic names and values are lists of arrays. Each list contains simulation results accumulated from multiple runs. """ combined_simulation_outputs = {} for simulation_outputs in simulation_outputs_list: if not combined_simulation_outputs: # If combined_dict is empty, initialize it with the new dictionary for key, value in simulation_outputs.items(): combined_simulation_outputs[key] = [value] else: # If combined_dict already has keys, append the new dictionary's values for key, value in simulation_outputs.items(): if key in combined_simulation_outputs: combined_simulation_outputs[key].append(value) else: combined_simulation_outputs[key] = [value] # cast lists to arrays for key in combined_simulation_outputs.items(): combined_simulation_outputs[key] = np.array(value) return combined_simulation_outputs
@njit(cache=True) def _multinomial_probs(n, rates, stay_idx, mask, dt, apply_linear_approximation): """ Compiled probability computation for a multinomial draw with a 'stay' compartment. Returns a float64 probs array ready to pass to rng.multinomial. The RNG draw is kept in Python (see `multinomial`) to preserve numpy Generator seeding and reproducibility — Numba's internal RNG is a separate legacy state. """ k = len(rates) probs = np.zeros(k) if n <= 0: probs[stay_idx] = 1.0 return probs H = 0.0 for i in range(k): if mask[i]: H += rates[i] * dt if H <= 0.0: probs[stay_idx] = 1.0 return probs if apply_linear_approximation: for i in range(k): if mask[i]: probs[i] = rates[i] * dt probs[stay_idx] = 1.0 - H return probs p_leave = -np.expm1(-H) scale = p_leave / H for i in range(k): if mask[i]: probs[i] = rates[i] * dt * scale probs[stay_idx] = 1.0 - p_leave return probs
[docs] def multinomial( n, rates, stay_idx, mask, dt, apply_linear_approximation=False, rng=None, probs_out=None, ): """ Multinomial sample with a 'stay' compartment. Probability computation is JIT-compiled via Numba; the draw uses the numpy Generator passed in to preserve reproducibility. Args: n (int): number of trials rates (np.ndarray): array of rates stay_idx (int): index of the stay compartment mask (np.ndarray): boolean array selecting the 'leave' destinations dt (float): time step size apply_linear_approximation (bool): whether to apply a linear approximation to the probabilities rng (np.random.Generator): random number generator probs_out (np.ndarray): unused, kept for API compatibility Returns: np.ndarray: array of multinomial samples """ rng = np.random.default_rng() if rng is None else rng probs = _multinomial_probs(n, rates, stay_idx, mask, dt, apply_linear_approximation) if n <= 0: out = np.zeros(len(rates), dtype=int) out[stay_idx] = int(n) return out return rng.multinomial(int(n), probs)
# Trigger JIT compilation at import time so the first simulation call # doesn't pay the compilation cost. _multinomial_probs( 1.0, np.array([0.1, 0.05, 0.0]), 0, np.array([False, True, True]), 1.0, False, )
[docs] def get_initial_conditions_dict(Nk, perc_dict): """ Helper function to get initial conditions dictionary from percentage dictionary. Args: perc_dict (dict): Dictionary with percentage of population for each compartment Returns: dict: Initial conditions dictionary """ return {key: (Nk * perc_dict[key] / 100).astype(int) for key in perc_dict.keys()}