"""
Sampling utilities.
This module provides a collection of small helper functions for random
sampling, including:
* Sampling from sequences (with/without replacement)
* Uniform scalar and array sampling
* Gaussian (normal) scalar and array sampling
* Log-normal, exponential, Poisson, binomial, Dirichlet, and
multivariate normal sampling
* Cached sigma perturbation generator for 6D state vectors (get_sigmas)
All random draws are implemented via NumPy's RNG.
"""
from __future__ import annotations
import os
import random
from pathlib import Path
from typing import Sequence, Optional
import numpy as np
# ---------------------------------------------------------------------
# Cache settings for get_sigmas
# ---------------------------------------------------------------------
ENV_VAR = "ssapy_toolkit_CACHE"
DEFAULT_FILE = "covariance_sigmas.npy"
# ---------------------------------------------------------------------
# Sequence sampling / simple random numbers
# ---------------------------------------------------------------------
[docs]
def sample_from_sequence(seq: Sequence, n: int, replacement: bool = False) -> np.ndarray:
"""
Uniformly sample `n` elements from a sequence.
Parameters
----------
seq : Sequence
The sequence to sample from.
n : int
Number of elements to sample.
replacement : bool, default False
If True, sample with replacement; otherwise without replacement.
Returns
-------
np.ndarray
Array of sampled elements.
Notes
-----
This is a thin wrapper around numpy.random.choice with `p=None`
(uniform discrete sampling over the sequence indices).
"""
arr = np.array(seq)
return np.random.choice(arr, size=n, replace=replacement)
[docs]
def rand_num(low: float = 0.0, high: float = 1.0) -> float:
"""
Draw a single uniform random float in [low, high).
Parameters
----------
low : float, default 0.0
Lower bound (inclusive).
high : float, default 1.0
Upper bound (exclusive).
Returns
-------
float
Uniform random number in [low, high).
"""
return float(np.random.uniform(low, high, 1).astype("float64"))
[docs]
def shuffle(x: list) -> None:
"""
Shuffle a list in-place using Python's built-in RNG.
Parameters
----------
x : list
List to shuffle.
Returns
-------
None
"""
return random.shuffle(x)
[docs]
def random_arr(
low: float = 0,
high: float = 1,
size: tuple = (1, 10),
dtype: str = "float64",
) -> np.ndarray:
"""
Generate a random array with specified bounds and size.
Parameters
----------
low : float, default 0
Lower bound.
high : float, default 1
Upper bound (for floats) or inclusive upper bound (for ints).
size : tuple, default (1, 10)
Shape of the array.
dtype : str, default 'float64'
Data type. If 'int' is in the string, integer sampling is used.
Returns
-------
np.ndarray
Random array.
"""
if "int" in dtype:
return np.random.randint(low, high + 1, size, dtype=dtype)
else:
return np.random.uniform(low, high, size).astype(dtype)
# ---------------------------------------------------------------------
# More specific distribution helpers
# ---------------------------------------------------------------------
[docs]
def normal_scalar(mean: float = 0.0, std: float = 1.0) -> float:
"""
Gaussian N(mean, std^2) scalar sample.
Returns
-------
float
"""
return float(np.random.normal(loc=mean, scale=std, size=1))
[docs]
def normal_array(
mean: float = 0.0,
std: float = 1.0,
size: tuple = (1, 10),
dtype: str = "float64",
) -> np.ndarray:
"""
Gaussian N(mean, std^2) array sample.
Returns
-------
np.ndarray
"""
return np.random.normal(loc=mean, scale=std, size=size).astype(dtype)
[docs]
def lognormal_array(
mean: float = 0.0,
sigma: float = 1.0,
size: tuple = (1, 10),
dtype: str = "float64",
) -> np.ndarray:
"""
Log-normal array sample: exp( N(mean, sigma^2) ).
Returns
-------
np.ndarray
"""
return np.random.lognormal(mean=mean, sigma=sigma, size=size).astype(dtype)
[docs]
def exponential_array(
scale: float = 1.0,
size: tuple = (1, 10),
dtype: str = "float64",
) -> np.ndarray:
"""
Exponential(scale) array sample.
Parameters
----------
scale : float
1 / lambda, the mean of the distribution.
Returns
-------
np.ndarray
"""
return np.random.exponential(scale=scale, size=size).astype(dtype)
[docs]
def poisson_array(
lam: float = 1.0,
size: tuple = (1, 10),
dtype: str = "int64",
) -> np.ndarray:
"""
Poisson(lam) array sample.
Parameters
----------
lam : float
Expected value (lambda).
Returns
-------
np.ndarray
"""
return np.random.poisson(lam=lam, size=size).astype(dtype)
[docs]
def binomial_array(
n: int = 1,
p: float = 0.5,
size: tuple = (1, 10),
dtype: str = "int64",
) -> np.ndarray:
"""
Binomial(n, p) array sample.
Parameters
----------
n : int
Number of trials.
p : float
Success probability.
Returns
-------
np.ndarray
"""
return np.random.binomial(n=n, p=p, size=size).astype(dtype)
[docs]
def dirichlet_array(alpha: Sequence[float], size: int = 1) -> np.ndarray:
"""
Dirichlet(alpha) sampling.
Parameters
----------
alpha : sequence of float
Concentration parameters (> 0).
size : int
Number of samples.
Returns
-------
np.ndarray
Shape (size, len(alpha)).
"""
alpha_arr = np.asarray(alpha, dtype="float64")
return np.random.dirichlet(alpha_arr, size=size)
[docs]
def multivariate_normal_array(
mean: Sequence[float],
cov: np.ndarray,
size: int = 1,
dtype: str = "float64",
) -> np.ndarray:
"""
Multivariate normal N(mean, cov) sampling.
Parameters
----------
mean : sequence of float
Mean vector.
cov : np.ndarray
Covariance matrix.
size : int
Number of samples.
Returns
-------
np.ndarray
Shape (size, dim).
"""
mean_arr = np.asarray(mean, dtype="float64")
cov_arr = np.asarray(cov, dtype="float64")
return np.random.multivariate_normal(mean_arr, cov_arr, size=size).astype(dtype)
# ---------------------------------------------------------------------
# Cached sigma perturbations for 6D state vectors
# ---------------------------------------------------------------------
[docs]
def get_sigmas(n: int = 25, path: Optional[str] = None) -> np.ndarray:
"""
Return an (n, 6) array of random sigma perturbations, cached on disk.
Each row is:
[dx, dy, dz, dvx, dvy, dvz]
Generation logic (when cache is recomputed):
* Positions are uniformly sampled inside a 3D ball of radius 10.
* Velocities are uniformly sampled inside a 3D ball of radius 1.
Cache path resolution when `path` is None:
1) Use directory from env var ssapy_toolkit_CACHE
(file name: covariance_sigmas.npy)
2) Fallback to ~/.cache/ssapy_toolkit/covariance_sigmas.npy
If `path` is provided, it is treated as an explicit file path.
Parameters
----------
n : int, default 25
Number of sigma samples (rows).
path : str or None
Optional explicit path for the cache file.
Returns
-------
np.ndarray
Array of shape (n, 6).
"""
if path is None:
from .IO import yudata # type: ignore # kept as in original
# Prefer env-provided directory, then fallback to ~/.cache/ssapy_toolkit
env_dir = os.environ.get(ENV_VAR, "").strip()
dirs = []
if env_dir:
dirs.append(Path(env_dir).expanduser())
dirs.append(Path.home() / ".cache" / "ssapy_toolkit")
# Use datapath to choose/create the first usable directory
path = Path(yudata(DEFAULT_FILE, dirs=dirs))
else:
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
compute_new = True
sigmas: Optional[np.ndarray] = None
if path.exists():
print("Loading sigmas.")
try:
sigmas = np.load(path)
compute_new = not (
sigmas.ndim == 2 and sigmas.shape[0] == n and sigmas.shape[1] == 6
)
except Exception:
# Corrupt or incompatible file -> recompute
compute_new = True
if compute_new:
print("Computing sigmas.")
sigmas = np.zeros((n, 6), dtype="float64")
for i in range(n):
x_dist, v_dist = 10.0, 1.0
while True:
# Sample uniformly in a cube, then reject if outside sphere
tmpx, tmpy, tmpz = np.random.uniform(-x_dist, x_dist, 3)
tmpvx, tmpvy, tmpvz = np.random.uniform(-v_dist, v_dist, 3)
if (
np.sqrt(tmpx**2 + tmpy**2 + tmpz**2) <= x_dist
and np.sqrt(tmpvx**2 + tmpvy**2 + tmpvz**2) <= v_dist
):
break
sigmas[i, :] = [tmpx, tmpy, tmpz, tmpvx, tmpvy, tmpvz]
np.save(path, sigmas)
return sigmas
def _sample_3d_offset(
scale: float,
distribution: str = "uniform",
rng: Optional[np.random.Generator] = None,
) -> np.ndarray:
"""
Internal helper: draw a 3D offset vector with isotropic direction.
Parameters
----------
scale : float
A characteristic length/velocity scale (e.g. max radius or sigma).
distribution : {'uniform', 'normal', 'gaussian', 'shell', 'laplace'}
- 'uniform' : uniform inside a solid ball of radius = scale
- 'normal'/'gaussian' : each component ~ N(0, scale^2)
- 'shell' : isotropic direction with fixed radius = scale
- 'laplace' : i.i.d Laplace(0, scale) per component
rng : np.random.Generator or None
Optional RNG. If None, uses np.random.default_rng().
Returns
-------
np.ndarray
Shape (3,), the sampled 3D offset.
"""
if scale <= 0:
return np.zeros(3, dtype="float64")
if rng is None:
rng = np.random.default_rng()
distribution = distribution.lower()
if distribution == "uniform":
# Uniform in a solid ball of radius = scale
# Radius ~ U(0,1)^(1/3) * scale, direction isotropic
direction = rng.normal(size=3)
norm = np.linalg.norm(direction)
if norm == 0.0:
direction = np.array([1.0, 0.0, 0.0], dtype="float64")
norm = 1.0
direction = direction / norm
u = rng.random()
radius = scale * u ** (1.0 / 3.0)
return radius * direction
if distribution in ("normal", "gaussian"):
# Isotropic Gaussian: components i.i.d. N(0, scale^2)
return rng.normal(loc=0.0, scale=scale, size=3)
if distribution in ("shell", "surface"):
# Uniform direction on sphere, fixed radius = scale
direction = rng.normal(size=3)
norm = np.linalg.norm(direction)
if norm == 0.0:
direction = np.array([1.0, 0.0, 0.0], dtype="float64")
norm = 1.0
direction = direction / norm
return scale * direction
if distribution == "laplace":
# Components i.i.d. Laplace(0, scale)
return rng.laplace(loc=0.0, scale=scale, size=3)
raise ValueError(
f"Unknown distribution '{distribution}'. "
"Choose from: 'uniform', 'normal', 'gaussian', 'shell', 'surface', 'laplace'."
)
[docs]
def perturb_state_3d(
r: np.ndarray,
v: np.ndarray,
pos_scale: float = 1.0,
vel_scale: float = 0.1,
pos_distribution: str = "uniform",
vel_distribution: str = "uniform",
rng: Optional[np.random.Generator] = None,
) -> tuple[np.ndarray, np.ndarray]:
"""
Perturb a 3D position and velocity with isotropic spherical uncertainty.
Parameters
----------
r : array-like, shape (3,)
Nominal 3D position vector.
v : array-like, shape (3,)
Nominal 3D velocity vector.
pos_scale : float, default 1.0
Characteristic scale of the position perturbation.
Interpretation depends on `pos_distribution`:
- 'uniform' : max radius of solid ball
- 'normal' : standard deviation per component
- 'shell' : fixed radius
- 'laplace' : Laplace scale per component
vel_scale : float, default 0.1
Same idea as `pos_scale`, but for the velocity perturbation.
pos_distribution : {'uniform', 'normal', 'gaussian', 'shell', 'surface', 'laplace'}
Distribution for position offset (see `_sample_3d_offset`).
vel_distribution : {'uniform', 'normal', 'gaussian', 'shell', 'surface', 'laplace'}
Distribution for velocity offset.
rng : np.random.Generator or None
Optional RNG for reproducibility.
Returns
-------
(r_pert, v_pert) : tuple of np.ndarray
Perturbed position and velocity, each shape (3,).
Examples
--------
>>> r = np.array([7000e3, 0.0, 0.0])
>>> v = np.array([0.0, 7.5e3, 0.0])
>>> r_p, v_p = perturb_state_3d(r, v, pos_scale=100.0, vel_scale=0.1,
... pos_distribution='uniform',
... vel_distribution='normal')
"""
r = np.asarray(r, dtype="float64")
v = np.asarray(v, dtype="float64")
if r.shape != (3,) or v.shape != (3,):
raise ValueError(
f"r and v must be 3-vectors of shape (3,), got {r.shape} and {v.shape}"
)
if rng is None:
rng = np.random.default_rng()
delta_r = _sample_3d_offset(
scale=pos_scale,
distribution=pos_distribution,
rng=rng,
)
delta_v = _sample_3d_offset(
scale=vel_scale,
distribution=vel_distribution,
rng=rng,
)
return r + delta_r, v + delta_v