Source code for fitting.estimators

"""
Initial parameter estimators for curve fitting.

Inputs (x, y) are assumed to be finite and already validated/cleaned by the caller.
This module provides functions to estimate initial parameter values and
ranges for various model types (linear, polynomial, trigonometric,
gaussian, logistic, exponential, etc.) to improve fitting convergence.
"""

# Standard library
from typing import Any, List, Tuple

# Numerical library
import numpy as np
from scipy.signal import find_peaks

# Local imports
from i18n import t
from utils import get_logger

logger = get_logger(__name__)

# Minimum amplitude to avoid division by zero in phase estimation
_MIN_AMPLITUDE = 1e-10
# Minimum denominator to avoid singularities in least-squares
_EPS_DENOM = 1e-30
# Maximum number of lags for autocorrelation period estimation
_MAX_AUTOCORR_LAG = 200


[docs] def estimate_trigonometric_parameters(x: Any, y: Any) -> Tuple[float, float]: """ Estimate initial parameters for trigonometric functions (sin/cos). Estimates amplitude (a) and angular frequency (b) for y = a * sin(b*x) or y = a * cos(b*x). Uses peak detection first, with autocorrelation fallback when peaks are insufficient. Args: x: Independent variable array y: Dependent variable array Returns: Tuple (amplitude, frequency). """ x = np.asarray(x, dtype=float) y = np.asarray(y, dtype=float) y_centered = y - np.mean(y) y_range = float(np.ptp(y)) amplitude = y_range / 2.0 if np.abs(amplitude) < _MIN_AMPLITUDE: amplitude = 1.0 frequency = 1.0 x_range = float(np.ptp(x)) if x_range < _EPS_DENOM: # No range in x: cannot estimate period; avoid division by zero below logger.debug( t( "log.estimated_trig_parameters", amplitude=f"{amplitude:.3f}", frequency=f"{frequency:.3f}", ) ) return amplitude, frequency # 1) Try peak-based period try: min_dist = max(1, len(x) // 10) peaks, _ = find_peaks(np.abs(y_centered), distance=min_dist) if len(peaks) >= 2: peak_distances = np.diff(x[peaks]) if len(peak_distances) > 0 and np.median(peak_distances) > 0: estimated_period = 2.0 * float(np.median(peak_distances)) if estimated_period > _EPS_DENOM: frequency = 2.0 * np.pi / estimated_period else: frequency = 2.0 * np.pi / x_range except Exception as e: logger.warning(t("log.peak_detection_failed", error=str(e))) # 2) Fallback: autocorrelation to estimate period if frequency <= 0 or frequency > 1e8: try: n = min(len(y_centered), _MAX_AUTOCORR_LAG) acf = np.correlate(y_centered[:n], y_centered[:n], mode="full") acf = acf[len(acf) // 2 :] if len(acf) > 2: peaks_acf, _ = find_peaks(acf, distance=max(1, n // 10)) if len(peaks_acf) >= 2: lag_diff = np.diff(peaks_acf) if len(lag_diff) > 0 and np.median(lag_diff) > 0: dx = ( float(np.median(np.diff(x))) if len(x) > 1 else x_range / max(1, len(x)) ) period = float(np.median(lag_diff)) * dx if period > _EPS_DENOM: frequency = 2.0 * np.pi / period if not (0 < frequency <= 1e8): frequency = 2.0 * np.pi / x_range except Exception as e: logger.warning(t("log.peak_detection_failed", error=str(e))) frequency = 2.0 * np.pi / x_range frequency = np.clip(float(frequency), 1e-8, 1e8) logger.debug( t( "log.estimated_trig_parameters", amplitude=f"{amplitude:.3f}", frequency=f"{frequency:.3f}", ) ) return amplitude, frequency
[docs] def estimate_phase_shift(x: Any, y: Any, amplitude: float, frequency: float) -> float: """ Estimate initial phase shift for y = a * sin(b*x + c) or y = a * cos(b*x + c). Args: x: Independent variable array y: Dependent variable array amplitude: Estimated amplitude (a) frequency: Estimated angular frequency (b) Returns: Estimated phase shift (c). """ x = np.asarray(x, dtype=float) y = np.asarray(y, dtype=float) if np.abs(amplitude) < _MIN_AMPLITUDE: return 0.0 try: y_normalized = y / amplitude y_normalized = np.clip(y_normalized, -1.0, 1.0) first_max_idx = int(np.argmax(y_normalized)) x_at_max = float(x[first_max_idx]) # Phase such that at x_at_max the sine is at its max: b*x + c = pi/2 phase = np.pi / 2.0 - frequency * x_at_max phase = float(np.arctan2(np.sin(phase), np.cos(phase))) except Exception as e: logger.warning(t("log.phase_estimation_failed", error=str(e))) phase = 0.0 logger.debug(t("log.estimated_phase_shift", phase=f"{phase:.3f}")) return phase
[docs] def estimate_linear_parameters(x: Any, y: Any) -> Tuple[float, float]: """ Estimate initial parameters for linear fit y = m*x + n (intercept n, slope m). Args: x: Independent variable array y: Dependent variable array Returns: Tuple (n, m): intercept and slope. """ x = np.asarray(x, dtype=float) y = np.asarray(y, dtype=float) if len(x) < 2: return float(np.mean(y)), 0.0 coefs = np.polyfit(x, y, 1) return float(coefs[1]), float(coefs[0])
[docs] def estimate_polynomial_parameters(x: Any, y: Any, degree: int) -> List[float]: """ Estimate initial parameters for polynomial y = c0 + c1*x + ... + c_degree*x^degree. Args: x: Independent variable array. y: Dependent variable array. degree: Polynomial degree. Returns: List of coefficients [c0, c1, ..., c_degree] (constant term first). """ x = np.asarray(x, dtype=float) y = np.asarray(y, dtype=float) if len(x) <= degree: out = [0.0] * (degree + 1) out[0] = float(np.mean(y)) return out coefs = np.polyfit(x, y, degree) return list(reversed(coefs))
[docs] def estimate_single_power_parameter(x: Any, y: Any, power: int) -> float: """ Estimate coefficient a for y = a * x^power (no constant term). Least-squares: a = sum(y * x^power) / sum(x^(2*power)). Args: x: Independent variable array y: Dependent variable array power: Exponent (e.g. 2 for quadratic through origin) Returns: Estimated coefficient a. """ x = np.asarray(x, dtype=float) y = np.asarray(y, dtype=float) xp = x**power denom = np.sum(xp * xp) if denom < _EPS_DENOM: return 1.0 return float(np.sum(y * xp) / denom)
[docs] def estimate_ln_parameter(x: Any, y: Any) -> float: """ Estimate initial parameter a for y = a * ln(x). Least squares with no intercept; positive x only. Args: x: Independent variable array (positive) y: Dependent variable array Returns: Estimated coefficient a. """ x = np.asarray(x, dtype=float) y = np.asarray(y, dtype=float) ln_x = np.log(x) denom = np.sum(ln_x * ln_x) if denom < _EPS_DENOM: return float(np.mean(y) / (np.mean(ln_x) + _EPS_DENOM)) return float(np.sum(y * ln_x) / denom)
[docs] def estimate_inverse_parameter(x: Any, y: Any, power: int) -> float: """ Estimate coefficient a for y = a / x^power. Uses median of (y * x^power) for robustness to outliers. Args: x: Independent variable array (avoid zeros) y: Dependent variable array power: Power in denominator (1 or 2) Returns: Estimated coefficient a. """ x = np.asarray(x, dtype=float) y = np.asarray(y, dtype=float) return float(np.median(y * (x**power)))
[docs] def estimate_gaussian_parameters(x: Any, y: Any) -> Tuple[float, float, float]: """ Estimate initial (A, mu, sigma) for y = A * exp(-(x-mu)^2 / (2*sigma^2)). A from max(y), mu from x at max, sigma from FWHM (half-max crossings). Args: x: Independent variable array y: Dependent variable array Returns: Tuple (A, mu, sigma). """ x = np.asarray(x, dtype=float) y = np.asarray(y, dtype=float) idx_max = int(np.argmax(y)) A_0 = float(y[idx_max]) mu_0 = float(x[idx_max]) y_min = float(np.min(y)) y_range = A_0 - y_min if y_range < _EPS_DENOM: x_range = float(np.ptp(x)) return A_0, mu_0, max(x_range / 4.0, 1.0) if x_range > 0 else 1.0 half_max = y_min + 0.5 * y_range above = np.where(y >= half_max)[0] if len(above) >= 2: # FWHM ≈ 2*sqrt(2*ln(2))*sigma => sigma = FWHM / (2*sqrt(2*ln(2))) x_above = x[above] width = float(np.max(x_above) - np.min(x_above)) sigma_0 = width / (2.0 * np.sqrt(2.0 * np.log(2.0))) else: x_range = float(np.ptp(x)) sigma_0 = x_range / 4.0 if x_range > 0 else 1.0 if sigma_0 <= 0: sigma_0 = 1.0 return A_0, mu_0, sigma_0
[docs] def estimate_binomial_parameters(x: Any, y: Any) -> Tuple[float, float, float]: """ Estimate (a, b, c) for logistic y = a / (1 + exp(-b*(x-c))). a = range, c = midpoint of transition, b from inverse of transition width. Args: x: Independent variable array y: Dependent variable array Returns: Tuple (a, b, c). """ x = np.asarray(x, dtype=float) y = np.asarray(y, dtype=float) y_min = float(np.min(y)) y_max = float(np.max(y)) a_0 = (y_max - y_min) if (y_max - y_min) > _EPS_DENOM else 1.0 mid_level = y_min + 0.5 * (y_max - y_min) order = np.argsort(x) x_s = x[order] y_s = y[order] if mid_level <= y_s[0]: c_0 = float(x_s[0]) elif mid_level >= y_s[-1]: c_0 = float(x_s[-1]) else: i = np.searchsorted(y_s, mid_level) t_val = (mid_level - y_s[i - 1]) / (y_s[i] - y_s[i - 1] + _EPS_DENOM) c_0 = float((1.0 - t_val) * x_s[i - 1] + t_val * x_s[i]) x_range = float(np.ptp(x)) if x_range < _EPS_DENOM: x_range = 1.0 b_0 = 4.0 / x_range return a_0, b_0, c_0
[docs] def estimate_exponential_parameters(x: Any, y: Any) -> Tuple[float, float]: """ Estimate (a, b) for y = a * exp(b*x). Uses log(y) = log(a) + b*x when y > 0; otherwise fallback from endpoints. Args: x: Independent variable array y: Dependent variable array Returns: Tuple (a, b). """ x = np.asarray(x, dtype=float) y = np.asarray(y, dtype=float) x_range = float(np.ptp(x)) if x_range < 1e-12: x_range = 1.0 b_max = 700.0 / x_range if np.all(y > 1e-15): log_y = np.log(np.maximum(y, 1e-300)) slope, intercept = np.polyfit(x, log_y, 1) b_0 = float(np.clip(slope, -b_max + 0.01, b_max - 0.01)) a_0 = float(np.exp(intercept)) else: a_0 = float(y[0]) if np.abs(y[0]) > 1e-12 else 1.0 if np.abs(a_0) < 1e-12: a_0 = 1.0 dx = x[-1] - x[0] if np.abs(dx) > 1e-12: end_ratio = np.abs(y[-1]) / (np.abs(y[0]) + 1e-300) b_0 = float( np.clip(np.log(end_ratio + 1e-12) / dx, -b_max + 0.01, b_max - 0.01) ) else: b_0 = 0.0 return a_0, b_0
[docs] def estimate_square_pulse_parameters(x: Any, y: Any) -> Tuple[float, float, float]: """ Estimate (A, t0, w) for a smooth square pulse: amplitude, center time, width. A from peak-to-peak, t0 from center of mass of ``y`` (absolute value), w from support of elevated region. Args: x: Independent variable array (e.g. time) y: Dependent variable array Returns: Tuple (A, t0, w). """ x = np.asarray(x, dtype=float) y = np.asarray(y, dtype=float) A_0 = float(np.ptp(y)) or 1.0 idx_max = int(np.argmax(y)) t0_0 = float(x[idx_max]) x_range = float(np.ptp(x)) half = np.min(y) + 0.5 * (np.max(y) - np.min(y)) above = np.where(y >= half)[0] if len(above) >= 2: w_0 = float(np.ptp(x[above])) else: w_0 = x_range / 5.0 if x_range > 0 else 1.0 if w_0 <= 0: w_0 = 1.0 return A_0, t0_0, w_0
[docs] def estimate_hyperbolic_parameters(x: Any, y: Any) -> Tuple[float, float]: """ Estimate initial parameters for hyperbolic functions (sinh/cosh). For y = a * sinh(b*x) or y = a * cosh(b*x): amplitude (a) from half the y range, frequency (b) from inverse of x range. Args: x: Independent variable array y: Dependent variable array Returns: Tuple (amplitude, frequency). """ x = np.asarray(x, dtype=float) y = np.asarray(y, dtype=float) y_range = float(np.ptp(y)) amplitude = y_range / 2.0 if y_range > 0 else 1.0 x_range = float(np.ptp(x)) frequency = 1.0 / x_range if x_range > _EPS_DENOM else 1.0 return amplitude, frequency
[docs] def estimate_hyperbolic_bounds( x: Any, ) -> Tuple[Tuple[float, float], Tuple[float, float]]: """ Return parameter bounds for hyperbolic fits (sinh/cosh) to avoid overflow. For y = a * sinh(b*x) or y = a * cosh(b*x): a can be any real, b must be bounded so that b*max(abs(x)) does not cause overflow in exp. Args: x: Independent variable array Returns: Pair (lower_bounds, upper_bounds), each of length 2 for (a, b). """ x = np.asarray(x, dtype=float) x_max_abs = float(np.max(np.abs(x))) + _EPS_DENOM b_max = min(700.0 / x_max_abs, 1e3) return ([-np.inf, 1e-9], [np.inf, b_max])