Source code for spotgp.observations

"""
observations.py — Time series data container with PSD and ACF computation.
"""

import numpy as np
from .psd import compute_psd

__all__ = ["TimeSeriesData"]


[docs] class TimeSeriesData: """ Container for observed time series data. Parameters ---------- x : array_like, shape (N,) Observation times. y : array_like, shape (N,) Observed values (e.g. flux). yerr : array_like, shape (N,) or float Measurement uncertainties. A scalar is broadcast to all points. normalize : bool If True (default), normalize the flux so that the median is 1 and scale yerr accordingly. """ def __init__(self, x, y, yerr, normalize=False, zero_mean=False): x = np.asarray(x, dtype=float) y = np.asarray(y, dtype=float) yerr = np.asarray(yerr, dtype=float) yerr = np.broadcast_to(yerr, x.shape).copy() if x.shape != y.shape: raise ValueError( f"x and y must have the same shape, " f"got {x.shape} and {y.shape}") if x.shape != yerr.shape: raise ValueError( f"x and yerr must have the same shape, " f"got {x.shape} and {yerr.shape}") # Mask out non-finite values mask = np.isfinite(x) & np.isfinite(y) & np.isfinite(yerr) self.x = x[mask] self.y = y[mask] self.yerr = yerr[mask] if normalize: self.normalize() if zero_mean: self.y = self.y - np.mean(self.y) @property def N(self) -> int: """Number of data points.""" return len(self.x) @property def baseline(self) -> float: """Total time baseline.""" return float(self.x[-1] - self.x[0]) @property def median_dt(self) -> float: """Median time step.""" return float(np.median(np.diff(self.x)))
[docs] def normalize(self): """ Normalize the flux so that the median equals 1. Divides ``y`` and ``yerr`` by the median of ``y``. This is idempotent: calling it on already-normalized data (median ~ 1) has negligible effect. """ median = np.median(self.y) if median != 0: self.yerr = self.yerr / np.abs(median) self.y = self.y / median
[docs] def detrend(self, window): """ Subtract a rolling-median trend from the flux. A median is computed in a sliding window of width ``window`` (in the same units as ``x``, typically days). The trend is subtracted and the global median is added back so the baseline stays near the original level. Parameters ---------- window : float Window width for the rolling median [same units as x]. """ window = float(window) trend = np.empty_like(self.y) for i in range(len(self.x)): mask = np.abs(self.x - self.x[i]) <= window / 2.0 trend[i] = np.median(self.y[mask]) self.y = self.y - trend + np.median(self.y)
[docs] def sigma_clip(self, lower=3.0, upper=3.0): """ Remove outliers beyond a sigma threshold. Masks out points where ``y`` falls below ``mean(y) - |lower| * std(y)`` or above ``mean(y) + |upper| * std(y)``. Parameters ---------- lower : float Number of standard deviations below the mean (default 3). upper : float Number of standard deviations above the mean (default 3). """ mean = np.mean(self.y) std = np.std(self.y) lo = mean - np.abs(lower) * std hi = mean + np.abs(upper) * std mask = (self.y >= lo) & (self.y <= hi) self.x = self.x[mask] self.y = self.y[mask] self.yerr = self.yerr[mask]
[docs] def downsample(self, dt): """ Downsample the time series into non-overlapping bins of width ``dt``. Within each bin the flux is the inverse-variance weighted mean and the uncertainty is propagated as: yerr_bin = 1 / sqrt(sum(1 / yerr_i^2)) Bins containing zero points are dropped. Parameters ---------- dt : float Bin width in the same units as ``x``. """ dt = float(dt) bin_edges = np.arange(self.x[0], self.x[-1] + dt, dt) bin_idx = np.digitize(self.x, bin_edges) - 1 x_new, y_new, yerr_new = [], [], [] for i in range(len(bin_edges) - 1): mask = bin_idx == i if not np.any(mask): continue err_bin = self.yerr[mask] y_bin = self.y[mask] x_new.append(np.mean(self.x[mask])) if np.all(err_bin == 0): n = np.sum(mask) y_new.append(np.mean(y_bin)) yerr_new.append(np.std(y_bin) / np.sqrt(n) if n > 1 else 0.0) else: w = 1.0 / err_bin ** 2 w_sum = np.sum(w) y_new.append(np.sum(w * y_bin) / w_sum) yerr_new.append(1.0 / np.sqrt(w_sum)) self._x_full = self.x self._y_full = self.y self._yerr_full = self.yerr self.x = np.array(x_new) self.y = np.array(y_new) self.yerr = np.array(yerr_new)
[docs] def compute_psd(self, normalization="psd", freq_min=None, freq_max=None, n_freq=None, samples_per_peak=5): """ Compute the Lomb-Scargle power spectral density. Parameters ---------- normalization : str LombScargle normalization mode (default "psd"). freq_min, freq_max : float, optional Frequency bounds. n_freq : int, optional Number of frequency grid points. samples_per_peak : float, optional Frequency grid density (default 5). Returns ------- freq : ndarray Frequencies [cycles per unit time]. power : ndarray PSD at each frequency. """ x = getattr(self, '_x_full', self.x) y = getattr(self, '_y_full', self.y) freq, power = compute_psd( y, t=x, normalization=normalization, freq_min=freq_min, freq_max=freq_max, n_freq=n_freq, samples_per_peak=samples_per_peak, ) self.psd_freq = freq self.psd_power = power return freq, power
[docs] def compute_acf(self, n_bins=None, max_lag=None): """ Compute the empirical autocorrelation function from irregularly sampled data using binned lag pairs. For each pair of observations (i, j), the lag ``|x_i - x_j|`` and product ``(y_i - mean)(y_j - mean)`` are accumulated into bins. The ACF is the mean product in each bin, normalized by the variance. Parameters ---------- n_bins : int, optional Number of lag bins (default: N // 2). max_lag : float, optional Maximum lag to compute (default: half the baseline). Returns ------- lag_centers : ndarray, shape (n_bins,) Center of each lag bin. acf : ndarray, shape (n_bins,) Normalized autocorrelation in each bin. """ x = getattr(self, '_x_full', self.x) y = getattr(self, '_y_full', self.y) N = len(x) y_centered = y - np.mean(y) var = np.var(y) if max_lag is None: max_lag = self.baseline / 2.0 if n_bins is None: n_bins = max(N // 2, 10) bin_edges = np.linspace(0, max_lag, n_bins + 1) lag_centers = 0.5 * (bin_edges[:-1] + bin_edges[1:]) acf_sum = np.zeros(n_bins) acf_count = np.zeros(n_bins) for i in range(N): lags = np.abs(x[i + 1:] - x[i]) products = y_centered[i] * y_centered[i + 1:] bin_idx = np.searchsorted(bin_edges[1:], lags) valid = bin_idx < n_bins np.add.at(acf_sum, bin_idx[valid], products[valid]) np.add.at(acf_count, bin_idx[valid], 1) nonempty = acf_count > 0 acf = np.zeros(n_bins) acf[nonempty] = acf_sum[nonempty] / (acf_count[nonempty] * var) self.acf_lags = lag_centers self.acf_values = acf self.acf_counts = acf_count.astype(int) return lag_centers, acf
[docs] @classmethod def from_lightkurve(cls, lc, normalize=False, zero_mean=False): """ Create a TimeSeriesData from a lightkurve LightCurve object. Extracts time, flux, and flux_err arrays, removes NaN entries, and converts Astropy quantities to plain floats if needed. Parameters ---------- lc : lightkurve.LightCurve A LightCurve object (e.g. from ``lk.search_lightcurve(...).download()``). normalize : bool If True (default), normalize flux to median of 1. zero_mean : bool If True, subtract the mean from the flux. Returns ------- TimeSeriesData """ # Extract arrays, handling Astropy Quantity objects time = np.asarray(lc.time.value, dtype=float) flux = np.asarray(lc.flux.value, dtype=float) if hasattr(lc.flux, 'value') else np.asarray(lc.flux, dtype=float) if lc.flux_err is not None and np.any(np.isfinite( np.asarray(lc.flux_err.value if hasattr(lc.flux_err, 'value') else lc.flux_err))): flux_err = np.asarray( lc.flux_err.value if hasattr(lc.flux_err, 'value') else lc.flux_err, dtype=float) else: flux_err = np.full_like(flux, np.nanmedian(np.abs(np.diff(flux)))) # NaN/inf masking is handled by __init__ return cls(time, flux, flux_err, normalize=normalize, zero_mean=zero_mean)
[docs] def plot(self, ax=None, color="k", alpha=0.6, marker=".", ms=2, xlabel="Time", ylabel="Flux", **kwargs): """ Plot the time series. Parameters ---------- ax : matplotlib Axes, optional Axes to plot on. If None, creates a new figure. color, alpha, marker, ms : plot style options. xlabel, ylabel : str Axis labels. **kwargs Extra keyword arguments passed to ``ax.errorbar``. Returns ------- ax : matplotlib Axes """ import matplotlib.pyplot as plt if ax is None: fig, ax = plt.subplots(figsize=(12, 4)) ax.errorbar(self.x, self.y, yerr=self.yerr, fmt=marker, color=color, alpha=alpha, ms=ms, elinewidth=0.5, **kwargs) ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) return ax
[docs] def plot_psd(self, ax=None, color="k", lw=1.0, loglog=True, xlabel="Frequency", ylabel="Power", **psd_kwargs): """ Plot the Lomb-Scargle PSD. Calls ``compute_psd()`` if it hasn't been run yet (or if ``psd_kwargs`` are provided to override previous settings). Parameters ---------- ax : matplotlib Axes, optional color, lw : plot style options. loglog : bool If True (default), use log-log axes. xlabel, ylabel : str Axis labels. **psd_kwargs Passed to ``compute_psd()``. Returns ------- ax : matplotlib Axes """ import matplotlib.pyplot as plt if psd_kwargs or not hasattr(self, "psd_freq"): self.compute_psd(**psd_kwargs) if ax is None: fig, ax = plt.subplots(figsize=(8, 4)) if loglog: ax.loglog(self.psd_freq, self.psd_power, color=color, lw=lw) else: ax.plot(self.psd_freq, self.psd_power, color=color, lw=lw) ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) return ax
[docs] def plot_acf(self, ax=None, color="k", lw=1.5, xlabel="Time lag", ylabel="ACF", **acf_kwargs): """ Plot the empirical autocorrelation function. Calls ``compute_acf()`` if it hasn't been run yet (or if ``acf_kwargs`` are provided to override previous settings). Parameters ---------- ax : matplotlib Axes, optional color, lw : plot style options. xlabel, ylabel : str Axis labels. **acf_kwargs Passed to ``compute_acf()``. Returns ------- ax : matplotlib Axes """ import matplotlib.pyplot as plt if acf_kwargs or not hasattr(self, "acf_lags"): self.compute_acf(**acf_kwargs) if ax is None: fig, ax = plt.subplots(figsize=(8, 4)) ax.plot(self.acf_lags, self.acf_values, color=color, lw=lw) ax.axhline(0, color="gray", ls="--", lw=0.5) ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) return ax
def __repr__(self) -> str: return (f"TimeSeriesData(N={self.N}, " f"baseline={self.baseline:.2f}, " f"median_dt={self.median_dt:.4f})")