"""Functions for generating least-squares model fits for linear models."""
from __future__ import annotations
import attr
import attrs
import numpy as np
import yaml
from abc import ABCMeta, abstractmethod
from cached_property import cached_property
from copy import copy
from edges_io.h5 import register_h5type
from hickleable import hickleable
from typing import Sequence, Type, Union
from . import receiver_calibration_func as rcf
from .simulate import simulate_q_from_calobs
from .tools import as_readonly
F_CENTER = 75.0
_MODELS = {}
[docs]@hickleable()
@attrs.define(frozen=True, kw_only=True, slots=False)
class FixedLinearModel(yaml.YAMLObject):
"""
A base class for a linear model fixed at a certain set of co-ordinates.
Using this class caches the basis functions at the particular coordinates, and thus
speeds up the fitting of multiple sets of data at those co-ordinates.
Parameters
----------
model
The linear model to evaluate at the co-ordinates
x
A set of co-ordinates at which to evaluate the model.
init_basis
If the basis functions of the model, evaluated at x, are known already, they
can be input directly to save computation time.
"""
yaml_tag = "!Model"
model: Model = attrs.field()
x: np.ndarray = attrs.field(converter=np.asarray)
_init_basis: np.ndarray | None = attrs.field(
default=None, converter=attrs.converters.optional(np.asarray)
)
[docs] @classmethod
def to_yaml(cls, dumper, data):
"""Method to convert to YAML format."""
return _model_yaml_representer(dumper, data.model)
@model.validator
def _model_vld(self, att, val):
assert isinstance(val, (Model, CompositeModel))
@_init_basis.validator
def _init_basis_vld(self, att, val):
if val is None:
return None
if val.shape[1] != len(self.x):
raise ValueError("The init_basis values must be the same shape as x.")
@property
def n_terms(self):
"""The number of terms/parameters in the model."""
return self.model.n_terms
@cached_property
def basis(self) -> np.ndarray:
"""The (cached) basis functions at default_x.
Shape ``(n_terms, x)``.
"""
out = np.zeros((self.model.n_terms, len(self.x)))
for indx in range(self.model.n_terms):
if self._init_basis is not None and indx < len(self._init_basis):
out[indx] = self._init_basis[indx]
else:
out[indx] = self.model.get_basis_term_transformed(indx, self.x)
return out
def __call__(
self,
x: np.ndarray | None = None,
parameters: Sequence | None = None,
indices: Sequence | None = None,
) -> np.ndarray:
"""Evaluate the model.
Parameters
----------
x
The coordinates at which to evaluate the model (by default, use ``self.x``).
params
A list/array of parameters at which to evaluate the model. Will use the
instance's parameters if available. If using a subset of the basis
functions, you can pass a subset of parameters.
indices
Sequence of parameters indices to use (other parameters are set to zero).
Returns
-------
model
The model evaluated at the input ``x``.
"""
return self.model(
basis=self.basis if x is None else None,
x=x,
parameters=parameters,
indices=indices,
)
[docs] def fit(
self,
ydata: np.ndarray,
weights: np.ndarray | float = 1.0,
xdata: np.ndarray | None = None,
):
"""Create a linear-regression fit object."""
thing = self.at_x(xdata) if xdata is not None else self
return ModelFit(
thing,
ydata=ydata,
weights=weights,
)
[docs] def at_x(self, x: np.ndarray) -> FixedLinearModel:
"""Return a new :class:`FixedLinearModel` at given co-ordinates."""
return attrs.evolve(self, x=x, init_basis=None)
[docs] def with_nterms(
self, n_terms: int, parameters: Sequence | None = None
) -> FixedLinearModel:
"""Return a new :class:`FixedLinearModel` with given nterms and parameters."""
init_basis = as_readonly(self.basis[: min(self.model.n_terms, n_terms)])
model = self.model.with_nterms(n_terms=n_terms, parameters=parameters)
return attrs.evolve(self, model=model, init_basis=init_basis)
[docs] def with_params(self, parameters: Sequence) -> FixedLinearModel:
"""Return a new :class:`FixedLinearModel` with givne parameters."""
assert len(parameters) == self.model.n_terms
init_basis = as_readonly(self.basis)
model = self.model.with_params(parameters=parameters)
return attrs.evolve(self, model=model, init_basis=init_basis)
@property
def parameters(self) -> np.ndarray | None:
"""The parameters of the model, if set."""
return self.model.parameters
def _transform_yaml_constructor(
loader: yaml.SafeLoader, node: yaml.nodes.MappingNode
) -> Model:
mapping = loader.construct_mapping(node, deep=True)
return ModelTransform.get(node.tag[1:])(**mapping)
def _transform_yaml_representer(
dumper: yaml.SafeDumper, tr: ModelTransform
) -> yaml.nodes.MappingNode:
dct = attrs.asdict(tr, recurse=False)
return dumper.represent_mapping(f"!{tr.__class__.__name__}", dct)
[docs]def tuple_converter(x):
"""Convert input to tuple of floats."""
return tuple(float(xx) for xx in x)
[docs]@hickleable()
@register_h5type
@attr.s(frozen=True, kw_only=True, slots=False)
class Model(metaclass=ABCMeta):
"""A base class for a linear model."""
default_n_terms: int | None = None
n_terms_min: int = 1
n_terms_max: int = 1000000
parameters: Sequence | None = attrs.field(
default=None,
converter=attrs.converters.optional(tuple),
)
n_terms: int = attrs.field(converter=attrs.converters.optional(int))
transform: ModelTransform = attrs.field(default=IdentityTransform())
def __init_subclass__(cls, is_meta=False, **kwargs):
"""Initialize a subclass and add it to the registered models."""
super().__init_subclass__(**kwargs)
if not is_meta:
_MODELS[cls.__name__.lower()] = cls
@n_terms.default
def _n_terms_default(self):
if self.parameters is not None:
return len(self.parameters)
else:
return self.__class__.default_n_terms
@n_terms.validator
def _n_terms_validator(self, att, val):
if val is None:
raise ValueError("Either n_terms or explicit parameters must be given.")
if not (self.n_terms_min <= val <= self.n_terms_max):
raise ValueError(
f"n_terms must be between {self.n_terms_min} and {self.n_terms_max}"
)
if self.parameters is not None and val != len(self.parameters):
raise ValueError(f"Wrong number of parameters! Should be {val}.")
[docs] @abstractmethod
def get_basis_term(self, indx: int, x: np.ndarray) -> np.ndarray:
"""Define the basis terms for the model."""
pass
[docs] def get_basis_terms(self, x: np.ndarray) -> np.ndarray:
"""Get a 2D array of all basis terms at ``x``."""
x = self.transform(x)
return np.array([self.get_basis_term(indx, x) for indx in range(self.n_terms)])
[docs] def with_nterms(
self, n_terms: int | None = None, parameters: Sequence | None = None
) -> Model:
"""Return a new :class:`Model` with given nterms and parameters."""
if parameters is not None:
n_terms = len(parameters)
return attrs.evolve(self, n_terms=n_terms, parameters=parameters)
[docs] def with_params(self, parameters: Sequence | None):
"""Get new model with different parameters."""
assert len(parameters) == self.n_terms
return self.with_nterms(parameters=parameters)
[docs] @staticmethod
def from_str(model: str, **kwargs) -> Model:
"""Obtain a :class:`Model` given a string name."""
return get_mdl(model)(**kwargs)
[docs] def at(self, **kwargs) -> FixedLinearModel:
"""Get an evaluated linear model."""
return FixedLinearModel(model=self, **kwargs)
def __call__(
self,
x: np.ndarray | None = None,
basis: np.ndarray | None = None,
parameters: Sequence | None = None,
indices: Sequence[int] | None = None,
) -> np.ndarray:
"""Evaluate the model.
Parameters
----------
x : np.ndarray, optional
The co-ordinates at which to evaluate the model (by default, use
``default_x``).
basis : np.ndarray, optional
The basis functions at which to evaluate the model. This is useful if
calling the model multiple times, as the basis itself can be cached and
re-used.
params
A list/array of parameters at which to evaluate the model. Will use the
instance's parameters if available. If using a subset of the basis
functions, you can pass a subset of parameters.
indices
Specifies which parameters/basis functions to use. Default is all of them.
Returns
-------
model : np.ndarray
The model evaluated at the input ``x`` or ``basis``.
"""
if parameters is None and self.parameters is None:
raise ValueError("You must supply parameters to evaluate the model!")
if parameters is None:
parameters = np.asarray(self.parameters)
else:
parameters = np.asarray(parameters)
indices = np.arange(len(parameters)) if indices is None else np.array(indices)
if x is None and basis is None:
raise ValueError("You must supply either x or basis!")
if basis is None:
basis = self.get_basis_terms(x)
if any(idx >= len(basis) for idx in indices):
raise ValueError("Cannot use more basis sets than available!")
if len(parameters) != len(indices):
parameters = parameters[indices]
return np.dot(parameters, basis[indices])
[docs] def fit(
self,
xdata: np.ndarray,
ydata: np.ndarray,
weights: np.ndarray | float = 1.0,
) -> ModelFit:
"""Create a linear-regression fit object."""
return self.at(x=xdata).fit(ydata, weights=weights)
[docs]def get_mdl(model: str | type[Model]) -> type[Model]:
"""Get a linear model class from a string input."""
if isinstance(model, str):
return _MODELS[model]
elif np.issubclass_(model, Model):
return model
else:
raise ValueError("model needs to be a string or Model subclass")
[docs]def get_mdl_inst(model: str | Model | type[Model], **kwargs) -> Model:
"""Get a model instance from given string input."""
if isinstance(model, Model):
if kwargs:
return attrs.evolve(model, **kwargs)
else:
return model
return get_mdl(model)(**kwargs)
[docs]@hickleable()
@attr.s(frozen=True, kw_only=True, slots=False)
class Foreground(Model, is_meta=True):
"""
Base class for Foreground models.
Parameters
----------
f_center : float
A "center" or "reference" frequency. Typically models will have their
co-ordindates divided by this frequency before solving for the
co-efficients.
with_cmb : bool
Whether to add a simple CMB component to the foreground.
"""
with_cmb: bool = attrs.field(default=False, converter=bool)
f_center: float = attrs.field(default=F_CENTER, converter=float)
transform: ModelTransform = attrs.field()
@transform.default
def _tr_default(self):
return ScaleTransform(scale=self.f_center)
[docs]@hickleable()
@attr.s(frozen=True, kw_only=True, slots=False)
class PhysicalLin(Foreground):
"""Foreground model using a linearized physical model of the foregrounds."""
n_terms_max: int = 5
default_n_terms: int = 5
[docs] def get_basis_term(self, indx: int, x: np.ndarray) -> np.ndarray:
"""Define the basis functions of the model."""
if indx < 3:
logy = np.log(x)
y25 = x**-2.5
return y25 * logy**indx
elif indx == 3:
return x**-4.5
elif indx == 4:
return 1 / (x * x)
else:
raise ValueError("too many terms supplied!")
[docs]@hickleable()
@attrs.define(frozen=True, kw_only=True, slots=False)
class Polynomial(Model):
r"""A polynomial foreground model.
Parameters
----------
log_x : bool
Whether to fit the poly coefficients with log-space co-ordinates.
offset : float
An offset to use for each index in the polynomial model.
kwargs
All other arguments passed through to :class:`Foreground`.
Notes
-----
The polynomial model can be written
.. math:: \sum_{i=0}^{n} c_i y^{i + offset},
where ``y`` is ``log(x)`` if ``log_x=True`` and simply ``x`` otherwise.
"""
offset: float = attrs.field(default=0, converter=float)
[docs] def get_basis_term(self, indx: int, x: np.ndarray) -> np.ndarray:
"""Define the basis functions of the model."""
return x ** (indx + self.offset)
[docs]@hickleable()
@attrs.define(frozen=True, kw_only=True, slots=False)
class EdgesPoly(Polynomial):
"""
Polynomial with an offset corresponding to approximate galaxy spectral index.
Parameters
----------
offset : float
The offset to use. Default is close to the Galactic spectral index.
kwargs
All other arguments are passed through to :class:`Polynomial`.
"""
offset: float = attrs.field(default=-2.5, converter=float)
[docs]@hickleable()
@attrs.define(frozen=True, kw_only=True)
class LinLog(Foreground):
beta: float = attrs.field(default=-2.5, converter=float)
@property
def _poly(self):
return Polynomial(
transform=LogTransform(),
offset=0,
n_terms=self.n_terms,
parameters=self.parameters,
)
[docs] def get_basis_term(self, indx: int, x: np.ndarray) -> np.ndarray:
"""Define the basis functions of the model."""
term = self._poly.get_basis_term_transformed(indx, x)
return term * x**self.beta
[docs]def LogPoly(**kwargs): # noqa: N802
"""A factory function for a LogPoly model."""
return Polynomial(transform=Log10Transform(), offset=0, **kwargs)
[docs]@hickleable()
@attrs.define(frozen=True, kw_only=True, slots=False)
class Fourier(Model):
"""A Fourier-basis model."""
period: float = attrs.field(default=2 * np.pi, converter=float)
@cached_property
def _period_fac(self):
return 2 * np.pi / self.period
[docs] def get_basis_term(self, indx: int, x: np.ndarray) -> np.ndarray:
"""Define the basis functions of the model."""
if indx == 0:
return np.ones_like(x)
elif indx % 2:
return np.cos(self._period_fac * ((indx + 1) // 2) * x)
else:
return np.sin(self._period_fac * ((indx + 1) // 2) * x)
[docs]@hickleable()
@attrs.define(frozen=True, kw_only=True, slots=False)
class FourierDay(Model):
"""A Fourier-basis model with period of 24 (hours)."""
@property
def _fourier(self):
return Fourier(period=48.0, n_terms=self.n_terms, parameters=self.parameters)
[docs] def get_basis_term(self, indx: int, x: np.ndarray) -> np.ndarray:
"""Define the basis functions of the model."""
return self._fourier.get_basis_term(indx, x)
[docs]@hickleable()
@attrs.define(frozen=True, kw_only=True, slots=False)
class CompositeModel:
models: dict[str, Model] = attrs.field()
extra_basis: dict[str, np.ndarray] = attrs.field(factory=dict)
@extra_basis.validator
def _eb_vld(self, att, val):
assert all(v in self.models for v in val)
@cached_property
def n_terms(self) -> int:
"""The number of terms in the full composite model."""
return sum(m.n_terms for m in self.models.values())
@cached_property
def parameters(self) -> np.ndarray:
"""The read-only list of parameters of all sub-models."""
return np.concatenate(tuple(m.parameters for m in self.models.values()))
@cached_property
def _index_map(self):
_index_map = {}
indx = 0
for name, model in self.models.items():
for i in range(model.n_terms):
_index_map[indx] = (name, i)
indx += 1
return _index_map
def __getitem__(self, item):
"""Get sub-models as if they were top-level attributes."""
if item not in self.models:
raise KeyError(f"{item} not one of the models.")
return self.models[item]
def __getattr__(self, item):
"""Get sub-models as if they were top-level attributes."""
if item not in self.models:
raise AttributeError(f"{item} is not one of the models.")
return self[item]
def _get_model_param_indx(self, model: str):
indx = list(self.models.keys()).index(model)
n_before = sum(m.n_terms for m in list(self.models.values())[:indx])
model = self.models[model]
return slice(n_before, n_before + model.n_terms, 1)
@cached_property
def model_idx(self) -> dict[str, slice]:
"""Dictionary of parameter indices correponding to each model."""
return {name: self._get_model_param_indx(name) for name in self.models}
[docs] def get_model(
self,
model: str,
parameters: np.ndarray = None,
x: np.ndarray | None = None,
with_extra: bool = False,
):
"""Calculate a sub-model."""
indx = self.model_idx[model]
extra = self.get_extra_basis(model, x) if with_extra else 1
model = self.models[model]
if parameters is None:
parameters = self.parameters
p = parameters if len(parameters) == model.n_terms else parameters[indx]
return model(x=x, parameters=p) * extra
[docs] def get_basis_term(self, indx: int, x: np.ndarray) -> np.ndarray:
"""Define the basis terms for the model."""
model, indx = self._index_map[indx]
extra = self.get_extra_basis(model, x)
try:
mask = extra.astype(bool)
extra = extra[mask]
except AttributeError:
mask = np.ones(len(x), dtype=bool)
out = np.zeros_like(x)
out[mask] = self[model].get_basis_term(indx, x[mask]) * extra
return out
[docs] def get_basis_terms(self, x: np.ndarray) -> np.ndarray:
"""Get a 2D array of all basis terms at ``x``."""
return np.array(
[self.get_basis_term_transformed(indx, x) for indx in range(self.n_terms)]
)
[docs] def with_nterms(
self, model: str, n_terms: int | None = None, parameters: Sequence | None = None
) -> Model:
"""Return a new :class:`Model` with given nterms and parameters."""
model_ = self[model]
if parameters is not None:
n_terms = len(parameters)
model_ = model_.with_nterms(n_terms=n_terms, parameters=parameters)
return attrs.evolve(self, models={**self.models, **{model: model_}})
[docs] def with_params(self, parameters: Sequence):
"""Get a new model with specified parameters."""
assert len(parameters) == self.n_terms
models = {
name: model.with_params(
parameters=parameters[self._get_model_param_indx(name)]
)
for name, model in self.models.items()
}
return attrs.evolve(self, models=models)
[docs] def at(self, **kwargs) -> FixedLinearModel:
"""Get an evaluated linear model."""
return FixedLinearModel(model=self, **kwargs)
def __call__(
self,
x: np.ndarray | None = None,
basis: np.ndarray | None = None,
parameters: Sequence | None = None,
indices: Sequence | None = None,
) -> np.ndarray:
"""Evaluate the model.
Parameters
----------
x : np.ndarray, optional
The co-ordinates at which to evaluate the model (by default, use
``default_x``).
basis : np.ndarray, optional
The basis functions at which to evaluate the model. This is useful if
calling the model multiple times, as the basis itself can be cached and
re-used.
params
A list/array of parameters at which to evaluate the model. Will use the
instance's parameters if available. If using a subset of the basis
functions, you can pass a subset of parameters.
Returns
-------
model : np.ndarray
The model evaluated at the input ``x`` or ``basis``.
"""
return Model.__call__(
self, x=x, basis=basis, parameters=parameters, indices=indices
)
[docs] def fit(
self,
xdata: np.ndarray,
ydata: np.ndarray,
weights: np.ndarray | float = 1.0,
) -> ModelFit:
"""Create a linear-regression fit object."""
return self.at(x=xdata).fit(ydata, weights=weights)
[docs]@hickleable()
@attrs.define(frozen=True, slots=False)
class ComplexRealImagModel(yaml.YAMLObject):
"""A composite model that is specifically for complex functions in real/imag."""
yaml_tag = "ComplexRealImagModel"
real: Model | FixedLinearModel = attrs.field()
imag: Model | FixedLinearModel = attrs.field()
[docs] def at(self, **kwargs) -> FixedLinearModel:
"""Get an evaluated linear model."""
return attrs.evolve(
self,
real=self.real.at(**kwargs),
imag=self.imag.at(**kwargs),
)
def __call__(
self,
x: np.ndarray | None = None,
parameters: Sequence | None = None,
) -> np.ndarray:
"""Evaluate the model.
Parameters
----------
x
The co-ordinates at which to evaluate the model (by default, use
``default_x``).
params
A list/array of parameters at which to evaluate the model. Will use the
instance's parameters if available. If using a subset of the basis
functions, you can pass a subset of parameters.
Returns
-------
model
The model evaluated at the input ``x`` or ``basis``.
"""
return self.real(
x=x,
parameters=parameters[: self.real.n_terms]
if parameters is not None
else None,
) + 1j * self.imag(
x=x,
parameters=parameters[self.real.n_terms :]
if parameters is not None
else None,
)
[docs] def fit(
self,
ydata: np.ndarray,
weights: np.ndarray | float = 1.0,
xdata: np.ndarray | None = None,
):
"""Create a linear-regression fit object."""
if isinstance(self.real, FixedLinearModel):
real = self.real
else:
real = self.real.at(x=xdata)
if isinstance(self.imag, FixedLinearModel):
imag = self.imag
else:
imag = self.imag.at(x=xdata)
real = real.fit(np.real(ydata), weights=weights).fit
imag = imag.fit(np.imag(ydata), weights=weights).fit
return attrs.evolve(self, real=real, imag=imag)
[docs]@hickleable()
@attrs.define(frozen=True, slots=False)
class ComplexMagPhaseModel(yaml.YAMLObject):
"""A composite model that is specifically for complex functions in mag/phase."""
yaml_tag = "ComplexMagPhaseModel"
mag: Model | FixedLinearModel = attrs.field()
phs: Model | FixedLinearModel = attrs.field()
[docs] def at(self, **kwargs) -> FixedLinearModel:
"""Get an evaluated linear model."""
return attrs.evolve(
self,
mag=self.mag.at(**kwargs),
phs=self.phs.at(**kwargs),
)
def __call__(
self,
x: np.ndarray | None = None,
parameters: Sequence | None = None,
) -> np.ndarray:
"""Evaluate the model.
Parameters
----------
x
The co-ordinates at which to evaluate the model (by default, use
``default_x``).
params
A list/array of parameters at which to evaluate the model. Will use the
instance's parameters if available. If using a subset of the basis
functions, you can pass a subset of parameters.
Returns
-------
model : np.ndarray
The model evaluated at the input ``x`` or ``basis``.
"""
return self.mag(
x=x,
parameters=parameters[: self.mag.n_terms]
if parameters is not None
else None,
) * np.exp(
1j
* self.phs(
x=x,
parameters=parameters[self.mag.n_terms :]
if parameters is not None
else None,
)
)
[docs] def fit(
self,
ydata: np.ndarray,
weights: np.ndarray | float = 1.0,
xdata: np.ndarray | None = None,
):
"""Create a linear-regression fit object."""
if isinstance(self.mag, FixedLinearModel):
mag = self.mag
else:
mag = self.mag.at(x=xdata)
if isinstance(self.phs, FixedLinearModel):
phs = self.phs
else:
phs = self.phs.at(x=xdata)
mag = mag.fit(np.abs(ydata), weights=weights).fit
phs = phs.fit(np.unwrap(np.angle(ydata)), weights=weights).fit
return attrs.evolve(self, mag=mag, phs=phs)
[docs]@hickleable()
@attrs.define(frozen=True, kw_only=True, slots=False)
class NoiseWaves:
freq: np.ndarray = attrs.field()
gamma_src: dict[str, np.ndarray] = attrs.field()
gamma_rec: np.ndarray = attrs.field()
c_terms: int = attrs.field(default=5)
w_terms: int = attrs.field(default=6)
parameters: Sequence | None = attrs.field(default=None)
with_tload: bool = attrs.field(default=True)
@cached_property
def src_names(self) -> tuple[str]:
"""List of names of inputs sources (eg. ambient, hot_load, open, short)."""
return tuple(self.gamma_src.keys())
[docs] def get_linear_model(self, with_k: bool = True) -> CompositeModel:
"""Define and return a Model.
Parameters
----------
with_k
Whether to use the K matrix as an "extra basis" in the linear model.
"""
if with_k:
# K should be a an array of shape (Nsrc Nnu x Nnoisewaveterms)
K = np.hstack(
tuple(
rcf.get_K(gamma_rec=self.gamma_rec, gamma_ant=s11src)
for s11src in self.gamma_src.values()
)
)
# x is the frequencies repeated for every input source
x = np.tile(self.freq, len(self.gamma_src))
tr = UnitTransform(range=(x.min(), x.max()))
models = {
"tunc": Polynomial(
n_terms=self.w_terms,
parameters=self.parameters[: self.w_terms]
if self.parameters is not None
else None,
transform=tr,
),
"tcos": Polynomial(
n_terms=self.w_terms,
parameters=self.parameters[self.w_terms : 2 * self.w_terms]
if self.parameters is not None
else None,
transform=tr,
),
"tsin": Polynomial(
n_terms=self.w_terms,
parameters=self.parameters[2 * self.w_terms : 3 * self.w_terms]
if self.parameters is not None
else None,
transform=tr,
),
}
if with_k:
extra_basis = {"tunc": K[1], "tcos": K[2], "tsin": K[3]}
if self.with_tload:
models["tload"] = Polynomial(
n_terms=self.c_terms,
parameters=self.parameters[3 * self.w_terms :]
if self.parameters is not None
else None,
transform=tr,
)
if with_k:
extra_basis["tload"] = -1 * np.ones(len(x))
if with_k:
return CompositeModel(models=models, extra_basis=extra_basis).at(x=x)
else:
return CompositeModel(models=models).at(x=x)
@cached_property
def linear_model(self) -> CompositeModel:
"""The actual composite linear model object associated with the noise waves."""
return self.get_linear_model()
[docs] def get_noise_wave(
self,
noise_wave: str,
parameters: Sequence | None = None,
src: str | None = None,
) -> np.ndarray:
"""Get the model for a particular noise-wave term."""
out = self.linear_model.model.get_model(
noise_wave,
parameters=parameters,
x=self.linear_model.x,
with_extra=bool(src),
)
if src:
indx = self.src_names.index(src)
return out[indx * len(self.freq) : (indx + 1) * len(self.freq)]
else:
return out[: len(self.freq)]
[docs] def get_full_model(
self, src: str, parameters: Sequence | None = None
) -> np.ndarray:
"""Get the full model (all noise-waves) for a particular input source."""
out = self.linear_model(parameters=parameters)
indx = self.src_names.index(src)
return out[indx * len(self.freq) : (indx + 1) * len(self.freq)]
[docs] def get_fitted(
self, data: np.ndarray, weights: np.ndarray | None = None
) -> NoiseWaves:
"""Get a new noise wave model with fitted parameters."""
fit = self.linear_model.fit(ydata=data, weights=weights)
return attrs.evolve(self, parameters=fit.model_parameters)
[docs] def with_params_from_calobs(self, calobs, cterms=None, wterms=None) -> NoiseWaves:
"""Get a new noise wave model with parameters fitted using standard methods."""
cterms = cterms or calobs.cterms
wterms = wterms or calobs.wterms
def modify(thing, n):
if len(thing) < n:
return thing + [0] * (n - len(thing))
elif len(thing) > n:
return thing[:n]
else:
return thing
tu = modify(calobs.Tunc_poly.coefficients[::-1].tolist(), wterms)
tc = modify(calobs.Tcos_poly.coefficients[::-1].tolist(), wterms)
ts = modify(calobs.Tsin_poly.coefficients[::-1].tolist(), wterms)
if self.with_tload:
c2 = (-calobs.C2_poly.coefficients[::-1]).tolist()
c2[0] += calobs.t_load
c2 = modify(c2, cterms)
return attrs.evolve(self, parameters=tu + tc + ts + c2)
[docs] def get_data_from_calobs(
self,
calobs,
tns: Model | None = None,
sim: bool = False,
loads: dict | None = None,
) -> np.ndarray:
"""Generate input data to fit from a calibration observation."""
if loads is None:
loads = calobs.loads
data = []
for src in self.src_names:
load = loads[src]
if tns is None:
_tns = calobs.C1() * calobs.t_load_ns
else:
_tns = tns(x=calobs.freq.freq)
q = (
simulate_q_from_calobs(calobs, load=src)
if sim
else load.spectrum.averaged_Q
)
c = calobs.get_K()[src][0]
data.append(_tns * q - c * load.temp_ave)
return np.concatenate(tuple(data))
[docs] @classmethod
def from_calobs(
cls,
calobs,
cterms=None,
wterms=None,
sources=None,
with_tload: bool = True,
loads: dict | None = None,
) -> NoiseWaves:
"""Initialize a noise wave model from a calibration observation."""
if loads is None:
if sources is None:
sources = calobs.load_names
loads = {src: load for src, load in calobs.loads.items() if src in sources}
freq = calobs.freq.freq.to_value("MHz")
gamma_src = {name: source.s11_model(freq) for name, source in loads.items()}
try:
lna_s11 = calobs.receiver.s11_model(freq)
except AttributeError:
lna_s11 = calobs.receiver_s11(freq)
nw_model = cls(
freq=freq,
gamma_src=gamma_src,
gamma_rec=lna_s11,
c_terms=cterms or calobs.cterms,
w_terms=wterms or calobs.wterms,
with_tload=with_tload,
)
return nw_model.with_params_from_calobs(calobs, cterms=cterms, wterms=wterms)
def __call__(self, **kwargs) -> np.ndarray:
"""Call the underlying linear model."""
return self.linear_model(**kwargs)
[docs]@hickleable()
@attrs.define(frozen=True, slots=False)
class ModelFit:
"""A class representing a fit of model to data.
Parameters
----------
model
The evaluatable model to fit to the data.
ydata
The values of the measured data.
weights
The weight of the measured data at each point. This corresponds to the
*variance* of the measurement (not the standard deviation). This is
appropriate if the weights represent the number of measurements going into
each piece of data.
Raises
------
ValueError
If model_type is not str, or a subclass of :class:`Model`.
"""
model: FixedLinearModel = attrs.field()
ydata: np.ndarray = attrs.field()
weights: np.ndarray | float = attrs.field(
default=1.0, validator=attrs.validators.instance_of((np.ndarray, float))
)
@ydata.validator
def _ydata_vld(self, att, val):
assert val.shape == self.model.x.shape
@weights.validator
def _weights_vld(self, att, val):
if isinstance(val, np.ndarray):
assert val.shape == self.model.x.shape
@cached_property
def degrees_of_freedom(self) -> int:
"""The number of degrees of freedom of the fit."""
return self.model.x.size - self.model.model.n_terms - 1
@cached_property
def fit(self) -> FixedLinearModel:
"""A model that has parameters set based on the best fit to this data."""
if np.isscalar(self.weights):
pars = self._ls(self.model.basis, self.ydata)
else:
pars = self._wls(self.model.basis, self.ydata, w=self.weights)
# Create a new model with the same parameters but specific parameters and xdata.
return self.model.with_params(parameters=pars)
def _wls(self, van, y, w):
"""Ripped straight outta numpy for speed.
Note: this function is written purely for speed, and is intended to *not*
be highly generic. Don't replace this by statsmodels or even np.polyfit. They
are significantly slower (>4x for statsmodels, 1.5x for polyfit).
"""
# set up the least squares matrices and apply weights.
# Don't use inplace operations as they
# can cause problems with NA.
mask = w > 0
lhs = van[:, mask] * w[mask]
rhs = y[mask] * w[mask]
rcond = y.size * np.finfo(y.dtype).eps
# Determine the norms of the design matrix columns.
scl = np.sqrt(np.square(lhs).sum(1))
scl[scl == 0] = 1
# Solve the least squares problem.
c, resids, rank, s = np.linalg.lstsq((lhs.T / scl), rhs.T, rcond)
c = (c.T / scl).T
return c
def _ls(self, van, y):
"""Ripped straight outta numpy for speed.
Note: this function is written purely for speed, and is intended to *not*
be highly generic. Don't replace this by statsmodels or even np.polyfit. They
are significantly slower (>4x for statsmodels, 1.5x for polyfit).
"""
rcond = y.size * np.finfo(y.dtype).eps
# Determine the norms of the design matrix columns.
scl = np.sqrt(np.square(van).sum())
# Solve the least squares problem.
return np.linalg.lstsq((van.T / scl), y.T, rcond)[0] / scl
@cached_property
def model_parameters(self):
"""The best-fit model parameters."""
# Parameters need to be copied into this object, otherwise a new fit on the
# parent model will change the model_parameters of this fit!
return copy(self.fit.model.parameters)
[docs] def evaluate(self, x: np.ndarray | None = None) -> np.ndarray:
"""Evaluate the best-fit model.
Parameters
----------
x : np.ndarray, optional
The co-ordinates at which to evaluate the model. By default, use the input
data co-ordinates.
Returns
-------
y : np.ndarray
The best-fit model evaluated at ``x``.
"""
return self.fit(x=x)
@cached_property
def residual(self) -> np.ndarray:
"""Residuals of data to model."""
return self.ydata - self.evaluate()
@cached_property
def weighted_chi2(self) -> float:
"""The chi^2 of the weighted fit."""
return np.dot(self.residual.T, self.weights * self.residual)
[docs] def reduced_weighted_chi2(self) -> float:
"""The weighted chi^2 divided by the degrees of freedom."""
return (1 / self.degrees_of_freedom) * self.weighted_chi2
[docs] def weighted_rms(self) -> float:
"""The weighted root-mean-square of the residuals."""
return np.sqrt(self.weighted_chi2) / np.sum(self.weights)
@cached_property
def hessian(self):
"""The Hessian matrix of the linear parameters."""
b = self.model.basis
w = self.weights
return (b * w).dot(b.T)
@cached_property
def parameter_covariance(self) -> np.ndarray:
"""The Covariance matrix of the parameters."""
return np.linalg.inv(self.hessian)
[docs] def get_sample(self, size: int | tuple[int] = 1):
"""Generate a random sample from the posterior distribution."""
return np.random.multivariate_normal(
mean=self.model_parameters, cov=self.parameter_covariance, size=size
)
def _model_yaml_constructor(
loader: yaml.SafeLoader, node: yaml.nodes.MappingNode
) -> Model:
mapping = loader.construct_mapping(node, deep=True)
model = get_mdl(mapping.pop("model"))
return model(**mapping)
def _model_yaml_representer(
dumper: yaml.SafeDumper, model: Model
) -> yaml.nodes.MappingNode:
model_dct = attrs.asdict(model, recurse=False)
model_dct.update(model=model.__class__.__name__.lower())
if model_dct["parameters"] is not None:
model_dct["parameters"] = tuple(float(x) for x in model_dct["parameters"])
return dumper.represent_mapping("!Model", model_dct)
yaml.FullLoader.add_constructor("!Model", _model_yaml_constructor)
yaml.Loader.add_constructor("!Model", _model_yaml_constructor)
yaml.BaseLoader.add_constructor("!Model", _model_yaml_constructor)
yaml.add_multi_representer(Model, _model_yaml_representer)
yaml.add_multi_representer(ModelTransform, _transform_yaml_representer)
Modelable = Union[str, Type[Model]]