import datetime
import random
import string
from collections.abc import Iterable
from typing import Any, Dict, List, Optional, Union
import numpy as np
import pandas as pd
from evalidate import Expr, base_eval_model
try:
from numba import njit
_NUMBA_AVAILABLE = True
except ImportError:
_NUMBA_AVAILABLE = False
# Transparent fallback: @njit becomes a no-op decorator
[docs]
def njit(*args, **kwargs):
def decorator(fn):
return fn
return decorator if args and callable(args[0]) else decorator
[docs]
def is_scalar(value):
return np.isscalar(value) and not isinstance(value, (str, bytes))
[docs]
def is_iterable(value):
return isinstance(value, Iterable) and not isinstance(value, (str, bytes))
[docs]
def validate_parameter_shape(key: str, value: Any, T: int, n_age: int) -> None:
"""
Validates the shape of the input value based on its type and expected dimensions.
Args:
key (str): The key associated with the value in the parameters dictionary.
value (Union[np.ndarray, Iterable]): The value to be validated, which can be a NumPy array or an iterable.
T (int): The expected length of the first dimension.
n_age (int): The expected length of the second dimension.
Raises:
ValueError: If the value does not meet the required shape criteria or if the type is unsupported.
"""
# if isinstance(value, (int, float)):
if is_scalar(value):
return # Scalars don't need validation
elif is_iterable(value):
value = np.array(value)
if value.ndim == 1: # 1D array
if len(value) < T:
raise ValueError(
f"The length of the 1D iterable for parameter '{key}' is smaller than simulation length ({T})."
)
elif value.ndim == 2: # 2D array
if value.shape[0] != T and value.shape[0] != 1:
raise ValueError(
f"The first dimension of the 2D iterable for parameter '{key}' must be 1 or match simulation length ({T})."
)
if value.shape[1] != n_age:
raise ValueError(
f"The second dimension of the 2D iterable for parameter '{key}' must match number of age groups ({n_age})."
)
else:
raise ValueError(
f"Unsupported number of dimensions for parameter '{key}': {value.ndim}"
)
else:
raise ValueError(f"Unsupported type for parameter '{key}': {type(value)}")
[docs]
def resize_parameter(value: Any, T: int, n_age: int) -> np.ndarray:
"""
Resizes the input value to have the shape (T, n_age).
Args:
value (Union[np.ndarray, int, float]): The value to be resized, which can be a NumPy array or a scalar.
T (int): The length of the first dimension.
n_age (int): The length of the second dimension.
Returns:
np.ndarray: A 2D array with shape (T, n_age).
"""
if is_scalar(value): # Scalar value
return np.full((T, n_age), value)
value = np.array(value)
if value.ndim == 1: # 1D array
return np.tile(value, (n_age, 1)).T
elif value.ndim == 2: # 2D array
if value.shape[0] == 1: # If the first dimension is 1, repeat it to match T
return np.tile(value, (T, 1))
return value
[docs]
def create_definitions(
parameters: Dict[str, Any], T: int, n_age: int
) -> Dict[str, np.ndarray]:
"""
Generates a dictionary where each value is a 2D array based on the input dictionary and the parameters provided.
Args:
parameters (Dict[str, Union[np.ndarray, int, float, Iterable]]): A dictionary where values can be either scalars or iterables.
T (int): The length of the first dimension of the arrays to be created.
n_age (int): The length of the second dimension of the arrays to be created.
Returns:
Dict[str, np.ndarray]: A dictionary where keys are the same as in `parameters` and values are 2D arrays of shape `(T, n_age)`.
Raises:
ValueError: If any parameter value does not meet the required shape criteria.
"""
definitions = {}
for key, value in parameters.items():
validate_parameter_shape(key, value, T, n_age)
definitions[key] = resize_parameter(value, T, n_age)
return definitions
[docs]
def str_to_date(date_str: str) -> datetime.date:
"""
Converts a date string in the format 'YYYY-MM-DD' to a `datetime.date` object.
Args:
date_str (str): A string representing a date in the format 'YYYY-MM-DD'.
Returns:
datetime.date: A `datetime.date` object corresponding to the input string.
Raises:
ValueError: If the input string is not in the correct format.
"""
return datetime.datetime.strptime(date_str, "%Y-%m-%d").date()
[docs]
def apply_overrides(
definitions: Dict[str, np.ndarray],
overrides: Dict[str, List[Dict[str, Any]]],
dates: List[datetime.date],
) -> Dict[str, np.ndarray]:
"""
Applies parameter overrides to the definitions based on the specified date ranges.
Args:
definitions (dict): A dictionary where keys are parameter names and values are 2D arrays
representing the parameter values over time and demographics.
overrides (dict): A dictionary where keys are parameter names and values are lists of override
specifications. Each override specification is a dictionary containing:
- 'start_date' (str): The start date of the override period in 'YYYY-MM-DD' format.
- 'end_date' (str): The end date of the override period in 'YYYY-MM-DD' format.
- 'value' (np.ndarray or scalar): The value to override within the specified date range.
dates (list): A list of `datetime.date` objects corresponding to the time steps in the definitions arrays.
Returns:
dict: A dictionary with the same keys as `definitions`, but with values updated according to the overrides.
Raises:
ValueError: If the `override` values do not match the expected shape for the specified date ranges.
"""
if not overrides:
return definitions
result = definitions.copy()
for name, overrides in overrides.items():
if name not in definitions:
continue
for override in overrides:
start_date = pd.Timestamp(override["start_date"])
end_date = pd.Timestamp(override["end_date"])
override_value = override["value"]
# Convert dates to pandas timestamps for comparison
dates_pd = pd.DatetimeIndex(dates)
# Validate override value
T = sum((dates_pd >= start_date) & (dates_pd <= end_date))
n_age = definitions[name].shape[1]
validate_parameter_shape(name, override_value, T=T, n_age=n_age)
# Resize override value
override_array = resize_parameter(override_value, T=T, n_age=n_age)
# Apply override
mask = (dates_pd >= start_date) & (dates_pd <= end_date)
result[name][mask] = override_array
return result
[docs]
def generate_unique_string(length: int = 12) -> str:
"""
Generates a random unique string containing only letters.
Args:
length (int, optional): The length of the generated string. Defaults to 12.
Returns:
str: A random unique string containing both uppercase and lowercase letters.
"""
letters = string.ascii_letters # Contains both lowercase and uppercase letters
return "".join(random.choice(letters) for _ in range(length))
[docs]
def compute_days(
start_date: Union[str, pd.Timestamp], end_date: Union[str, pd.Timestamp]
) -> int:
"""
Computes the number of days between two dates.
Args:
start_date (Union[str, pd.Timestamp]): The start date in string format (e.g., "YYYY-MM-DD") or as a pandas Timestamp.
end_date (Union[str, pd.Timestamp]): The end date in string format (e.g., "YYYY-MM-DD") or as a pandas Timestamp.
Returns:
int: The number of days between the start date and end date, inclusive.
"""
return pd.date_range(start_date, end_date).shape[0]
[docs]
def evaluate(expr: str, env: dict) -> any:
"""
Evaluates the expression with the given environment, allowing only whitelisted operations.
This function extends the base evaluation model to whitelist the 'Mult' (multiplication)
and 'Pow' (power) operations, ensuring that only these operations are permitted during
the evaluation.
Args:
expr (str): The expression to evaluate. It is expected to be a string containing
the mathematical expression to be evaluated.
env (dict): The environment containing variable values. Keys should be variable names
and values should be their corresponding numeric values.
Returns:
any: The result of evaluating the expression. The result type depends on the expression
and its evaluation.
Raises:
EvalException: If there is an error in evaluating the expression, such as an invalid
operation or an undefined variable.
"""
eval_model = base_eval_model
eval_model.nodes.extend(["Mult", "Pow"])
return Expr(expr, model=eval_model).eval(env)
[docs]
def compute_simulation_dates(
start_date: Union[str, datetime.date, np.datetime64],
end_date: Union[str, datetime.date, np.datetime64],
steps: Optional[int] = None,
dt: float = 1.0,
) -> np.ndarray:
"""
Compute simulation dates including sub-day intervals.
Args:
start_date: Start date
end_date: End date
steps: Number of steps (if None, computed from dt)
dt: Time step in days (e.g., 1/3 for 8-hour intervals)
Returns:
numpy array of datetime64 objects for each simulation step
"""
# Convert inputs to pandas Timestamp for consistent handling
if isinstance(start_date, str):
start_date = pd.Timestamp(start_date)
elif isinstance(start_date, np.datetime64):
start_date = pd.Timestamp(start_date)
if isinstance(end_date, str):
end_date = pd.Timestamp(end_date)
elif isinstance(end_date, np.datetime64):
end_date = pd.Timestamp(end_date)
# Calculate total duration in days
total_days = (end_date - start_date).total_seconds() / (24 * 3600)
# Calculate number of steps if not provided
if steps is None:
steps = int(total_days / dt)
# Create timestamps with proper sub-day intervals
timestamps = pd.date_range(
start=start_date, periods=steps + 1, freq=pd.Timedelta(days=dt)
).values
return timestamps
[docs]
def apply_initial_conditions(epimodel, initial_conditions_dict) -> np.ndarray:
"""
Applies initial conditions to the compartments of an epidemiological model.
Args:
epimodel (EpiModel): An instance of an epidemiological model containing compartments and population information.
**kwargs: Keyword arguments where each key is a compartment name and each value is an array or list
specifying the initial population for that compartment.
Returns:
np.ndarray: A 2D array where rows correspond to compartments and columns correspond to demographic groups,
representing the initial conditions of the model. The shape of the array is
`(number_of_compartments, number_of_demographic_groups)`.
"""
# initialize population in different compartments and demographic groups
initial_conditions = np.zeros(
(len(epimodel.compartments), len(epimodel.population.Nk)), dtype="int"
)
for comp in epimodel.compartments:
if comp in initial_conditions_dict:
if comp in epimodel.compartments:
initial_conditions[epimodel.compartments_idx[comp]] = (
initial_conditions_dict[comp]
)
return initial_conditions
[docs]
def convert_to_2Darray(lst: List[Any]) -> np.ndarray:
"""
Converts a list into a 2D NumPy array.
Args:
lst (List[Any]): A list of elements to be converted into a 2D array. Elements can be of any type.
Returns:
np.ndarray: A 2D NumPy array with shape `(1, len(lst))`, where `len(lst)` is the length of the input list.
"""
arr = np.array(lst)
arr = arr.reshape(1, len(lst))
return arr
[docs]
def combine_simulation_outputs(
simulation_outputs_list: List[Dict[str, np.ndarray]],
) -> Dict[str, List[np.ndarray]]:
"""
Combines multiple simulation outputs into a single dictionary by appending new outputs to existing keys.
Args:
simulation_outputs_list (List[Dict[str, np.ndarray]]): A list of dictionaries containing the latest simulation output to be combined.
Each dictionary contains keys corresponding to compartment-demographic names and values are arrays of simulation results.
Returns:
Dict[str, List[np.ndarray]]: A dictionary where keys are compartment-demographic names and values are lists of arrays.
Each list contains simulation results accumulated from multiple runs.
"""
combined_simulation_outputs = {}
for simulation_outputs in simulation_outputs_list:
if not combined_simulation_outputs:
# If combined_dict is empty, initialize it with the new dictionary
for key, value in simulation_outputs.items():
combined_simulation_outputs[key] = [value]
else:
# If combined_dict already has keys, append the new dictionary's values
for key, value in simulation_outputs.items():
if key in combined_simulation_outputs:
combined_simulation_outputs[key].append(value)
else:
combined_simulation_outputs[key] = [value]
# cast lists to arrays
for key in combined_simulation_outputs.items():
combined_simulation_outputs[key] = np.array(value)
return combined_simulation_outputs
@njit(cache=True)
def _multinomial_probs(n, rates, stay_idx, mask, dt, apply_linear_approximation):
"""
Compiled probability computation for a multinomial draw with a 'stay' compartment.
Returns a float64 probs array ready to pass to rng.multinomial.
The RNG draw is kept in Python (see `multinomial`) to preserve numpy Generator
seeding and reproducibility — Numba's internal RNG is a separate legacy state.
"""
k = len(rates)
probs = np.zeros(k)
if n <= 0:
probs[stay_idx] = 1.0
return probs
H = 0.0
for i in range(k):
if mask[i]:
H += rates[i] * dt
if H <= 0.0:
probs[stay_idx] = 1.0
return probs
if apply_linear_approximation:
for i in range(k):
if mask[i]:
probs[i] = rates[i] * dt
probs[stay_idx] = 1.0 - H
return probs
p_leave = -np.expm1(-H)
scale = p_leave / H
for i in range(k):
if mask[i]:
probs[i] = rates[i] * dt * scale
probs[stay_idx] = 1.0 - p_leave
return probs
[docs]
def multinomial(
n,
rates,
stay_idx,
mask,
dt,
apply_linear_approximation=False,
rng=None,
probs_out=None,
):
"""
Multinomial sample with a 'stay' compartment.
Probability computation is JIT-compiled via Numba; the draw uses the
numpy Generator passed in to preserve reproducibility.
Args:
n (int): number of trials
rates (np.ndarray): array of rates
stay_idx (int): index of the stay compartment
mask (np.ndarray): boolean array selecting the 'leave' destinations
dt (float): time step size
apply_linear_approximation (bool): whether to apply a linear approximation to the probabilities
rng (np.random.Generator): random number generator
probs_out (np.ndarray): unused, kept for API compatibility
Returns:
np.ndarray: array of multinomial samples
"""
rng = np.random.default_rng() if rng is None else rng
probs = _multinomial_probs(n, rates, stay_idx, mask, dt, apply_linear_approximation)
if n <= 0:
out = np.zeros(len(rates), dtype=int)
out[stay_idx] = int(n)
return out
return rng.multinomial(int(n), probs)
# Trigger JIT compilation at import time so the first simulation call
# doesn't pay the compilation cost.
_multinomial_probs(
1.0,
np.array([0.1, 0.05, 0.0]),
0,
np.array([False, True, True]),
1.0,
False,
)
[docs]
def get_initial_conditions_dict(Nk, perc_dict):
"""
Helper function to get initial conditions dictionary from percentage dictionary.
Args:
perc_dict (dict): Dictionary with percentage of population for each compartment
Returns:
dict: Initial conditions dictionary
"""
return {key: (Nk * perc_dict[key] / 100).astype(int) for key in perc_dict.keys()}