"""
The main user-facing module of ``edges-cal``.
This module contains wrappers around lower-level functions in other modules, providing
a one-stop interface for everything related to calibration.
"""
from __future__ import annotations
import attr
import h5py
import hickle
import numpy as np
import warnings
from astropy import units as un
from astropy.convolution import Gaussian1DKernel, convolve
from astropy.io.misc import yaml as ayaml
from edges_io import io
from edges_io.logging import logger
from functools import partial
from hickleable import hickleable
from matplotlib import pyplot as plt
from pathlib import Path
from scipy.interpolate import InterpolatedUnivariateSpline as Spline
from types import SimpleNamespace
from typing import Any, Callable, Sequence
from . import modelling as mdl
from . import receiver_calibration_func as rcf
from . import reflection_coefficient as rc
from . import s11
from . import types as tp
from .cached_property import cached_property, safe_property
from .spectra import LoadSpectrum
from .tools import FrequencyRange, bin_array, get_data_path
[docs]@hickleable()
@attr.s(kw_only=True)
class HotLoadCorrection:
"""
Corrections for the hot load.
Measurements required to define the HotLoad temperature, from Monsalve et al.
(2017), Eq. 8+9.
Parameters
----------
path
Path to a file containing measurements of the semi-rigid cable reflection
parameters. A preceding colon (:) indicates to prefix with DATA_PATH.
The default file was measured in 2015, but there is also a file included
that can be used from 2017: ":semi_rigid_s_parameters_2017.txt".
f_low, f_high
Lowest/highest frequency to retain from measurements.
n_terms
The number of terms used in fitting S-parameters of the cable.
"""
freq: FrequencyRange = attr.ib()
raw_s11: np.ndarray = attr.ib(eq=attr.cmp_using(eq=np.array_equal))
raw_s12s21: np.ndarray = attr.ib(eq=attr.cmp_using(eq=np.array_equal))
raw_s22: np.ndarray = attr.ib(eq=attr.cmp_using(eq=np.array_equal))
model: mdl.Model = attr.ib(mdl.Polynomial(n_terms=21))
complex_model: type[mdl.ComplexRealImagModel] | type[
mdl.ComplexMagPhaseModel
] = attr.ib(mdl.ComplexMagPhaseModel)
use_spline: bool = attr.ib(False)
[docs] @classmethod
def from_file(
cls,
path: tp.PathLike = ":semi_rigid_s_parameters_WITH_HEADER.txt",
f_low: tp.FreqType = 0 * un.MHz,
f_high: tp.FreqType = np.inf * un.MHz,
set_transform_range: bool = True,
**kwargs,
):
"""Instantiate the HotLoadCorrection from file.
Parameters
----------
path
Path to the S-parameters file.
f_low, f_high
The min/max frequencies to use in the modelling.
"""
path = get_data_path(path)
data = np.genfromtxt(path)
freq = FrequencyRange(data[:, 0] * un.MHz, f_low=f_low, f_high=f_high)
data = data[freq.mask]
if data.shape[1] == 7: # Original file from 2015
data = data[:, 1::2] + 1j * data[:, 2::2]
elif data.shape[1] == 6: # File from 2017
data = np.array(
[
data[:, 1] + 1j * data[:, 2],
data[:, 3],
data[:, 4] + 1j * data[:, 5],
]
).T
model = kwargs.pop(
"model",
mdl.Polynomial(
n_terms=21,
transform=mdl.UnitTransform(
range=(freq.min.to_value("MHz"), freq.max.to_value("MHz"))
),
),
)
if hasattr(model.transform, "range") and set_transform_range:
model = attr.evolve(
model,
transform=attr.evolve(
model.transform,
range=(freq.min.to_value("MHz"), freq.max.to_value("MHz")),
),
)
return cls(
freq=freq,
raw_s11=data[:, 0],
raw_s12s21=data[:, 1],
raw_s22=data[:, 2],
model=model,
**kwargs,
)
def _get_model(self, raw_data: np.ndarray):
model = self.complex_model(self.model, self.model)
return model.fit(xdata=self.freq.freq, ydata=raw_data)
def _get_splines(self, data):
if self.complex_model == mdl.ComplexRealImagModel:
return (
Spline(self.freq.freq.to_value("MHz"), np.real(data)),
Spline(self.freq.freq.to_value("MHz"), np.imag(data)),
)
else:
return (
Spline(self.freq.freq.to_value("MHz"), np.abs(data)),
Spline(self.freq.freq.to_value("MHz"), np.angle(data)),
)
def _ev_splines(self, splines):
rl, im = splines
if self.complex_model == mdl.ComplexRealImagModel:
return lambda freq: rl(freq) + 1j * im(freq)
else:
return lambda freq: rl(freq) * np.exp(1j * im(freq))
@cached_property
def s11_model(self):
"""The reflection coefficient."""
if not self.use_spline:
return self._get_model(self.raw_s11)
else:
splines = self._get_splines(self.raw_s11)
return self._ev_splines(splines)
@cached_property
def s12s21_model(self):
"""The transmission coefficient."""
if not self.use_spline:
return self._get_model(self.raw_s12s21)
else:
splines = self._get_splines(self.raw_s12s21)
return self._ev_splines(splines)
@cached_property
def s22_model(self):
"""The reflection coefficient from the other side."""
if not self.use_spline:
return self._get_model(self.raw_s22)
else:
splines = self._get_splines(self.raw_s22)
return self._ev_splines(splines)
[docs] def power_gain(self, freq: tp.FreqType, hot_load_s11: s11.LoadS11) -> np.ndarray:
"""
Calculate the power gain.
Parameters
----------
freq : np.ndarray
The frequencies.
hot_load_s11 : :class:`LoadS11`
The S11 of the hot load.
Returns
-------
gain : np.ndarray
The power gain as a function of frequency.
"""
assert isinstance(hot_load_s11, s11.LoadS11), "hot_load_s11 must be a LoadS11"
assert (
hot_load_s11.load_name == "hot_load"
), "hot_load_s11 must be a hot_load s11"
return self.get_power_gain(
{
"s11": self.s11_model(freq.to_value("MHz")),
"s12s21": self.s12s21_model(freq.to_value("MHz")),
"s22": self.s22_model(freq.to_value("MHz")),
},
hot_load_s11.s11_model(freq.to_value("MHz")),
)
[docs] @staticmethod
def get_power_gain(
semi_rigid_sparams: dict, hot_load_s11: np.ndarray
) -> np.ndarray:
"""Define Eq. 9 from M17.
Parameters
----------
semi_rigid_sparams : dict
A dictionary of reflection coefficient measurements as a function of
frequency for the semi-rigid cable.
hot_load_s11 : array-like
The S11 measurement of the hot_load.
Returns
-------
gain : np.ndarray
The power gain.
"""
rht = rc.gamma_de_embed(
semi_rigid_sparams["s11"],
semi_rigid_sparams["s12s21"],
semi_rigid_sparams["s22"],
hot_load_s11,
)
return (
np.abs(semi_rigid_sparams["s12s21"])
* (1 - np.abs(rht) ** 2)
/ (
(np.abs(1 - semi_rigid_sparams["s11"] * rht)) ** 2
* (1 - np.abs(hot_load_s11) ** 2)
)
)
[docs]@hickleable()
@attr.s(kw_only=True)
class Load:
"""Wrapper class containing all relevant information for a given load.
Parameters
----------
spectrum : :class:`LoadSpectrum`
The spectrum for this particular load.
reflections : :class:`SwitchCorrection`
The S11 measurements for this particular load.
hot_load_correction : :class:`HotLoadCorrection`
If this is a hot load, provide a hot load correction.
ambient : :class:`LoadSpectrum`
If this is a hot load, need to provide an ambient spectrum to correct it.
"""
spectrum: LoadSpectrum = attr.ib()
reflections: s11.LoadS11 = attr.ib()
_loss_model: Callable[
[np.ndarray], np.ndarray
] | HotLoadCorrection | None = attr.ib(default=None)
ambient_temperature: float = attr.ib(default=298.0)
@property
def loss_model(self):
"""The loss model as a callable function of frequency."""
if isinstance(self._loss_model, HotLoadCorrection):
return partial(self._loss_model.power_gain, hot_load_s11=self.reflections)
else:
return self._loss_model
@property
def load_name(self) -> str:
"""The name of the load."""
return self.reflections.load_name
[docs] @classmethod
def from_io(
cls,
io_obj: io.CalibrationObservation,
load_name: str,
f_low: tp.FreqType = 40 * un.MHz,
f_high: tp.FreqType = np.inf * un.MHz,
reflection_kwargs: dict | None = None,
spec_kwargs: dict | None = None,
loss_kwargs: dict | None = None,
ambient_temperature: float | None = None,
):
"""
Define a full :class:`Load` from a path and name.
Parameters
----------
path : str or Path
Path to the top-level calibration observation.
load_name : str
Name of a load to define.
f_low, f_high : float
Min/max frequencies to keep in measurements.
reflection_kwargs : dict
Extra arguments to pass through to :class:`SwitchCorrection`.
spec_kwargs : dict
Extra arguments to pass through to :class:`LoadSpectrum`.
ambient_temperature
The ambient temperature to use for the loss, if required (required for new
hot loads). By default, read an ambient load's actual temperature reading
from the io object.
Returns
-------
load : :class:`Load`
The load object, containing all info about spectra and S11's for that load.
"""
if not spec_kwargs:
spec_kwargs = {}
if not reflection_kwargs:
reflection_kwargs = {}
loss_kwargs = loss_kwargs or {}
# Fill up kwargs with keywords from this instance
# TODO: here we only use the calkit defined for the FIRST switching_state,
# instead of using each calkit for each switching_state. To fix this, we require
# having meta information inside the S11/ directory.
if "internal_switch_kwargs" not in reflection_kwargs:
reflection_kwargs["internal_switch_kwargs"] = {}
if "calkit" not in reflection_kwargs["internal_switch_kwargs"]:
reflection_kwargs["internal_switch_kwargs"]["calkit"] = rc.get_calkit(
rc.AGILENT_85033E,
resistance_of_match=io_obj.definition["measurements"]["resistance_m"][
io_obj.s11.switching_state[0].run_num
],
)
# For the LoadSpectrum, we can specify both f_low/f_high and f_range_keep.
# The first pair is what defines what gets read in and smoothed/averaged.
# The second pair then selects a part of this range to keep for doing
# calibration with.
if "f_low" not in spec_kwargs:
spec_kwargs["f_low"] = f_low
if "f_high" not in spec_kwargs:
spec_kwargs["f_high"] = f_high
spec = LoadSpectrum.from_io(
io_obs=io_obj,
load_name=load_name,
f_range_keep=(f_low, f_high),
**spec_kwargs,
)
refl = s11.LoadS11.from_io(
io_obj.s11,
load_name,
f_low=f_low,
f_high=f_high,
**reflection_kwargs,
)
if load_name == "hot_load":
hlc = HotLoadCorrection.from_file(f_low=f_low, f_high=f_high, **loss_kwargs)
if ambient_temperature is None:
ambient_temperature = LoadSpectrum.from_io(
io_obj,
load_name="ambient",
f_range_keep=(f_low, f_high),
**spec_kwargs,
).temp_ave
return cls(
spectrum=spec,
reflections=refl,
loss_model=hlc,
ambient_temperature=ambient_temperature,
)
else:
return cls(spectrum=spec, reflections=refl)
[docs] def get_temp_with_loss(self, freq: tp.FreqType | None = None):
"""Calculate the temperature of the load accounting for loss."""
if freq is None:
freq = self.freq.freq
if self.loss_model is None:
return self.spectrum.temp_ave * np.ones(len(freq))
gain = self.loss_model(freq)
return gain * self.spectrum.temp_ave + (1 - gain) * self.ambient_temperature
@cached_property
def temp_ave(self) -> np.ndarray:
"""The average temperature of the thermistor (over frequency and time)."""
return self.get_temp_with_loss()
@property
def averaged_Q(self) -> np.ndarray:
"""The average spectrum power ratio, Q (over time)."""
return self.spectrum.q
@property
def averaged_spectrum(self) -> np.ndarray:
"""The average uncalibrated spectrum (over time)."""
return self.spectrum.averaged_spectrum
@property
def t_load(self) -> float:
"""The assumed temperature of the internal load."""
return self.spectrum.t_load
@property
def t_load_ns(self) -> float:
"""The assumed temperature of the internal load + noise source."""
return self.spectrum.t_load_ns
@property
def s11_model(self) -> Callable[[np.ndarray], np.ndarray]:
"""Callable S11 model as function of frequency."""
return self.reflections.s11_model
@property
def freq(self) -> FrequencyRange:
"""Frequencies of the spectrum."""
return self.spectrum.freq
[docs] def with_calkit(self, calkit: rc.Calkit):
"""Return a new Load with updated calkit."""
if "calkit" not in self.reflections.metadata:
raise RuntimeError(
"Cannot clone with new calkit since calkit is unknown for the load"
)
loads11 = [
attr.evolve(x, calkit=calkit)
for x in self.reflections.metadata["load_s11s"]
]
isw = self.reflections.internal_switch.with_new_calkit(calkit)
return attr.evolve(
self,
reflections=s11.LoadS11.from_load_and_internal_switch(
load_s11=loads11, internal_switch=isw, base=self.reflections
),
)
[docs]@hickleable()
@attr.s
class CalibrationObservation:
"""
A composite object representing a full Calibration Observation.
This includes spectra of all calibrators, and methods to find the calibration
parameters. It strictly follows Monsalve et al. (2017) in its formalism.
While by default the class uses the calibrator sources ("ambient", "hot_load",
"open", "short"), it can be modified to take other sources by setting
``CalibrationObservation._sources`` to a new tuple of strings.
Parameters
----------
loads
dictionary of load names to Loads
receiver
The object defining the reflection coefficient of the receiver.
cterms
The number of polynomial terms used for the scaling/offset functions
wterms
The number of polynomial terms used for the noise-wave parameters.
metadata
Metadata associated with the data.
"""
loads: dict[str, Load] = attr.ib()
receiver: s11.Receiver = attr.ib()
cterms: int = attr.ib(default=5, kw_only=True)
wterms: int = attr.ib(default=7, kw_only=True)
_metadata: dict[str, Any] = attr.ib(default=attr.Factory(dict), kw_only=True)
@property
def metadata(self):
"""Metadata associated with the object."""
return self._metadata
def __attrs_post_init__(self):
"""Set the loads as attributes directly."""
for k, v in self.loads.items():
setattr(self, k, v)
[docs] @classmethod
def from_io(
cls,
io_obj: io.CalibrationObservation,
*,
semi_rigid_path: tp.PathLike = ":semi_rigid_s_parameters_WITH_HEADER.txt",
freq_bin_size: int = 1,
spectrum_kwargs: dict[str, dict[str, Any]] | None = None,
s11_kwargs: dict[str, dict[str, Any]] | None = None,
internal_switch_kwargs: dict[str, Any] | None = None,
lna_kwargs: dict[str, Any] | None = None,
f_low: tp.FreqType = 40.0 * un.MHz,
f_high: tp.FreqType = np.inf * un.MHz,
sources: tuple[str] = ("ambient", "hot_load", "open", "short"),
receiver_kwargs: dict[str, Any] | None = None,
restrict_s11_model_freqs: bool = True,
hot_load_loss_kwargs: dict[str, Any] | None = None,
**kwargs,
) -> CalibrationObservation:
"""Create the object from an edges-io observation.
Parameters
----------
io_obj
An calibration observation object from which all the data can be read.
semi_rigid_path : str or Path, optional
Path to a file containing S11 measurements for the semi rigid cable. Used to
correct the hot load S11. Found automatically if not given.
freq_bin_size
The size of each frequency bin (of the spectra) in units of the raw size.
spectrum_kwargs
Keyword arguments used to instantiate the calibrator :class:`LoadSpectrum`
objects. See its documentation for relevant parameters. Parameters specified
here are used for _all_ calibrator sources.
s11_kwargs
Keyword arguments used to instantiate the calibrator :class:`LoadS11`
objects. See its documentation for relevant parameters. Parameters specified
here are used for _all_ calibrator sources.
internal_switch_kwargs
Keyword arguments used to instantiate the :class:`~s11.InternalSwitch`
objects. See its documentation for relevant parameters. The same internal
switch is used to calibrate the S11 for each input source.
f_low : float
Minimum frequency to keep for all loads (and their S11's). If for some
reason different frequency bounds are desired per-load, one can pass in
full load objects through ``load_spectra``.
f_high : float
Maximum frequency to keep for all loads (and their S11's). If for some
reason different frequency bounds are desired per-load, one can pass in
full load objects through ``load_spectra``.
sources
A sequence of strings specifying which loads to actually use in the
calibration. Default is all four standard calibrators.
receiver_kwargs
Keyword arguments used to instantiate the calibrator :class:`~s11.Receiver`
objects. See its documentation for relevant parameters. ``lna_kwargs`` is a
deprecated alias.
restrict_s11_model_freqs
Whether to restrict the S11 modelling (i.e. smoothing) to the given freq
range. The final output will be calibrated only between the given freq
range, but the S11 models themselves can be fit over a broader set of
frequencies.
"""
if f_high < f_low:
raise ValueError("f_high must be larger than f_low!")
if lna_kwargs is not None:
warnings.warn(
"Use of 'lna_kwargs' is deprecated, use 'receiver_kwargs' instead."
)
receiver_kwargs = lna_kwargs
spectrum_kwargs = spectrum_kwargs or {}
s11_kwargs = s11_kwargs or {}
internal_switch_kwargs = internal_switch_kwargs or {}
receiver_kwargs = receiver_kwargs or {}
hot_load_loss_kwargs = hot_load_loss_kwargs or {}
for v in [spectrum_kwargs, s11_kwargs, internal_switch_kwargs, receiver_kwargs]:
assert isinstance(v, dict)
f_low = f_low.to("MHz", copy=False)
f_high = f_high.to("MHz", copy=False)
if "calkit" not in receiver_kwargs:
receiver_kwargs["calkit"] = rc.get_calkit(
rc.AGILENT_85033E,
resistance_of_match=io_obj.definition.get("measurements", {})
.get("resistance_f", {})
.get(io_obj.s11.receiver_reading[0].run_num, 50.0 * un.Ohm),
)
receiver = s11.Receiver.from_io(
device=io_obj.s11.receiver_reading,
f_low=f_low if restrict_s11_model_freqs else 0 * un.MHz,
f_high=f_high if restrict_s11_model_freqs else np.inf * un.MHz,
**receiver_kwargs,
)
f_low = max(receiver.freq.min, f_low)
f_high = min(receiver.freq.max, f_high)
if "default" not in spectrum_kwargs:
spectrum_kwargs["default"] = {}
if "freq_bin_size" not in spectrum_kwargs["default"]:
spectrum_kwargs["default"]["freq_bin_size"] = freq_bin_size
def get_load(name, ambient_temperature=None):
return Load.from_io(
io_obj=io_obj,
load_name=name,
f_low=f_low,
f_high=f_high,
reflection_kwargs={
**s11_kwargs.get("default", {}),
**s11_kwargs.get(name, {}),
**{"internal_switch_kwargs": internal_switch_kwargs},
},
spec_kwargs={
**spectrum_kwargs["default"],
**spectrum_kwargs.get(name, {}),
},
loss_kwargs={**hot_load_loss_kwargs, **{"path": semi_rigid_path}},
ambient_temperature=ambient_temperature,
)
loads = {}
for src in sources:
loads[src] = get_load(
src,
ambient_temperature=loads["ambient"].spectrum.temp_ave
if src == "hot_load"
else None,
)
return cls(
loads=loads,
receiver=receiver,
metadata={
"path": io_obj.path,
"s11_kwargs": s11_kwargs,
"lna_kwargs": lna_kwargs,
"spectra": {
name: load.spectrum.metadata for name, load in loads.items()
},
"io": io_obj,
},
**kwargs,
)
[docs] def with_load_calkit(self, calkit, loads: Sequence[str] = None):
"""Return a new observation with loads having given calkit."""
if loads is None:
loads = self.load_names
elif isinstance(loads, str):
loads = [loads]
loads = {
name: load.with_calkit(calkit) if name in loads else load
for name, load in self.loads.items()
}
return attr.evolve(self, loads=loads)
@safe_property
def t_load(self) -> float:
"""Assumed temperature of the load."""
return self.loads[list(self.loads.keys())[0]].t_load
@safe_property
def t_load_ns(self) -> float:
"""Assumed temperature of the load + noise source."""
return self.loads[list(self.loads.keys())[0]].t_load_ns
@cached_property
def freq(self) -> FrequencyRange:
"""The frequencies at which spectra were measured."""
return self.loads[list(self.loads.keys())[0]].freq
@safe_property
def internal_switch(self):
"""The S11 object representing the internal switch."""
return self.loads[self.load_names[0]].reflections.internal_switch
@safe_property
def load_names(self) -> tuple[str]:
"""Names of the loads."""
return tuple(self.loads.keys())
[docs] def new_load(
self,
load_name: str,
io_obj: io.CalibrationObservation,
reflection_kwargs: dict | None = None,
spec_kwargs: dict | None = None,
):
"""Create a new load with the given load name.
Uses files inside the current observation.
Parameters
----------
load_name : str
The name of the load
run_num_spec : dict or int
Run number to use for the spectrum.
run_num_load : dict or int
Run number to use for the load's S11.
reflection_kwargs : dict
Keyword arguments to construct the :class:`SwitchCorrection`.
spec_kwargs : dict
Keyword arguments to construct the :class:`LoadSpectrum`.
"""
reflection_kwargs = reflection_kwargs or {}
spec_kwargs = spec_kwargs or {}
spec_kwargs["freq_bin_size"] = self.freq.bin_size
spec_kwargs["t_load"] = self.open.spectrum.t_load
spec_kwargs["t_load_ns"] = self.open.spectrum.t_load_ns
if "frequency_smoothing" not in spec_kwargs:
spec_kwargs["frequency_smoothing"] = self.open.spectrum.metadata[
"frequency_smoothing"
]
spec_kwargs["f_low"] = self.freq._f_low
spec_kwargs["f_high"] = self.freq._f_high
return Load.from_io(
io_obj=io_obj,
load_name=load_name,
f_low=self.freq.post_bin_f_low,
f_high=self.freq.post_bin_f_high,
reflection_kwargs=reflection_kwargs,
spec_kwargs=spec_kwargs,
)
[docs] def plot_raw_spectra(self, fig=None, ax=None) -> plt.Figure:
"""
Plot raw uncalibrated spectra for all calibrator sources.
Parameters
----------
fig : :class:`plt.Figure`
A matplotlib figure on which to make the plot. By default creates a new one.
ax : :class:`plt.Axes`
A matplotlib Axes on which to make the plot. By default creates a new one.
Returns
-------
fig : :class:`plt.Figure`
The figure on which the plot was made.
"""
if fig is None and ax is None:
fig, ax = plt.subplots(
len(self.loads), 1, sharex=True, gridspec_kw={"hspace": 0.05}
)
for i, (name, load) in enumerate(self.loads.items()):
ax[i].plot(load.freq.freq, load.averaged_spectrum)
ax[i].set_ylabel("$T^*$ [K]")
ax[i].set_title(name)
ax[i].grid(True)
ax[-1].set_xlabel("Frequency [MHz]")
return fig
[docs] def plot_s11_models(self, **kwargs):
"""
Plot residuals of S11 models for all sources.
Returns
-------
dict:
Each entry has a key of the source name, and the value is a matplotlib fig.
"""
out = {
name: source.reflections.plot_residuals(**kwargs)
for name, source in self.loads.items()
}
out.update({"lna": self.receiver.plot_residuals(**kwargs)})
return out
@cached_property
def s11_correction_models(self):
"""Dictionary of S11 correction models, one for each source."""
try:
return dict(self._injected_source_s11s)
except (TypeError, AttributeError):
return {
name: source.s11_model(self.freq.freq.to_value("MHz"))
for name, source in self.loads.items()
}
@cached_property
def source_thermistor_temps(self) -> dict[str, float | np.ndarray]:
"""Dictionary of input source thermistor temperatures."""
if (
hasattr(self, "_injected_source_temps")
and self._injected_source_temps is not None
):
return self._injected_source_temps
return {k: source.temp_ave for k, source in self.loads.items()}
@cached_property
def _calibration_coefficients(self):
"""The calibration polynomials, evaluated at `freq.freq`."""
if (
hasattr(self, "_injected_averaged_spectra")
and self._injected_averaged_spectra is not None
):
ave_spec = self._injected_averaged_spectra
else:
ave_spec = {k: source.averaged_spectrum for k, source in self.loads.items()}
scale, off, Tu, TC, TS = rcf.get_calibration_quantities_iterative(
self.freq.freq_recentred,
temp_raw=ave_spec,
gamma_rec=self.receiver_s11,
gamma_ant=self.s11_correction_models,
temp_ant=self.source_thermistor_temps,
cterms=self.cterms,
wterms=self.wterms,
temp_amb_internal=self.t_load,
)
return scale, off, Tu, TC, TS
@cached_property
def C1_poly(self): # noqa: N802
"""`np.poly1d` object describing the Scaling calibration coefficient C1.
The polynomial is defined to act on normalized frequencies such that `freq.min`
and `freq.max` map to -1 and 1 respectively. Use :func:`~C1` as a direct
function on frequency.
"""
return self._calibration_coefficients[0]
@cached_property
def C2_poly(self): # noqa: N802
"""`np.poly1d` object describing the offset calibration coefficient C2.
The polynomial is defined to act on normalized frequencies such that `freq.min`
and `freq.max` map to -1 and 1 respectively. Use :func:`~C2` as a direct
function on frequency.
"""
return self._calibration_coefficients[1]
@cached_property
def Tunc_poly(self): # noqa: N802
"""`np.poly1d` object describing the uncorrelated noise-wave parameter, Tunc.
The polynomial is defined to act on normalized frequencies such that `freq.min`
and `freq.max` map to -1 and 1 respectively. Use :func:`~Tunc` as a direct
function on frequency.
"""
return self._calibration_coefficients[2]
@cached_property
def Tcos_poly(self): # noqa: N802
"""`np.poly1d` object describing the cosine noise-wave parameter, Tcos.
The polynomial is defined to act on normalized frequencies such that `freq.min`
and `freq.max` map to -1 and 1 respectively. Use :func:`~Tcos` as a direct
function on frequency.
"""
return self._calibration_coefficients[3]
@cached_property
def Tsin_poly(self): # noqa: N802
"""`np.poly1d` object describing the sine noise-wave parameter, Tsin.
The polynomial is defined to act on normalized frequencies such that `freq.min`
and `freq.max` map to -1 and 1 respectively. Use :func:`~Tsin` as a direct
function on frequency.
"""
return self._calibration_coefficients[4]
[docs] def C1(self, f: tp.FreqType | None = None): # noqa: N802
"""
Scaling calibration parameter.
Parameters
----------
f : array-like
The frequencies at which to evaluate C1. By default, the frequencies of this
instance.
"""
if hasattr(self, "_injected_c1") and self._injected_c1 is not None:
return np.array(self._injected_c1)
fnorm = self.freq.freq_recentred if f is None else self.freq.normalize(f)
return self.C1_poly(fnorm)
[docs] def C2(self, f: tp.FreqType | None = None): # noqa: N802
"""
Offset calibration parameter.
Parameters
----------
f : array-like
The frequencies at which to evaluate C2. By default, the frequencies of this
instance.
"""
if hasattr(self, "_injected_c2") and self._injected_c2 is not None:
return np.array(self._injected_c2)
fnorm = self.freq.freq_recentred if f is None else self.freq.normalize(f)
return self.C2_poly(fnorm)
[docs] def Tunc(self, f: tp.FreqType | None = None): # noqa: N802
"""
Uncorrelated noise-wave parameter.
Parameters
----------
f : array-like
The frequencies at which to evaluate Tunc. By default, the frequencies of
thisinstance.
"""
if hasattr(self, "_injected_t_unc") and self._injected_t_unc is not None:
return np.array(self._injected_t_unc)
fnorm = self.freq.freq_recentred if f is None else self.freq.normalize(f)
return self.Tunc_poly(fnorm)
[docs] def Tcos(self, f: tp.FreqType | None = None): # noqa: N802
"""
Cosine noise-wave parameter.
Parameters
----------
f : array-like
The frequencies at which to evaluate Tcos. By default, the frequencies of
this instance.
"""
if hasattr(self, "_injected_t_cos") and self._injected_t_cos is not None:
return np.array(self._injected_t_cos)
fnorm = self.freq.freq_recentred if f is None else self.freq.normalize(f)
return self.Tcos_poly(fnorm)
[docs] def Tsin(self, f: tp.FreqType | None = None): # noqa: N802
"""
Sine noise-wave parameter.
Parameters
----------
f : array-like
The frequencies at which to evaluate Tsin. By default, the frequencies of
this instance.
"""
if hasattr(self, "_injected_t_sin") and self._injected_t_sin is not None:
return np.array(self._injected_t_sin)
fnorm = self.freq.freq_recentred if f is None else self.freq.normalize(f)
return self.Tsin_poly(fnorm)
@cached_property
def receiver_s11(self):
"""The corrected S11 of the LNA evaluated at the data frequencies."""
if hasattr(self, "_injected_lna_s11") and self._injected_lna_s11 is not None:
return self._injected_lna_s11
else:
return self.receiver.s11_model(self.freq.freq.to_value("MHz"))
[docs] def get_linear_coefficients(self, load: Load | str):
"""
Calibration coefficients a,b such that T = aT* + b (derived from Eq. 7).
Parameters
----------
load : str or :class:`Load`
The load for which to get the linear coefficients.
"""
if isinstance(load, str):
load_s11 = self.s11_correction_models[load]
elif load.load_name in self.s11_correction_models:
load_s11 = self.s11_correction_models[load.load_name]
else:
load_s11 = load.s11_model(self.freq.freq.to_value("MHz"))
return rcf.get_linear_coefficients(
load_s11,
self.receiver_s11,
self.C1(self.freq.freq),
self.C2(self.freq.freq),
self.Tunc(self.freq.freq),
self.Tcos(self.freq.freq),
self.Tsin(self.freq.freq),
t_load=self.t_load,
)
[docs] def calibrate(self, load: Load | str, q=None, temp=None):
"""
Calibrate the temperature of a given load.
Parameters
----------
load : :class:`Load` or str
The load to calibrate.
Returns
-------
array : calibrated antenna temperature in K, len(f).
"""
load = self._load_str_to_load(load)
a, b = self.get_linear_coefficients(load)
if q is not None:
temp = self.t_load_ns * q + self.t_load
elif temp is None:
temp = load.averaged_spectrum
return a * temp + b
def _load_str_to_load(self, load: Load | str):
if isinstance(load, str):
try:
load = self.loads[load]
except (AttributeError, KeyError):
raise AttributeError(
f"load must be a Load object or a string (one of {self.load_names})"
)
else:
assert isinstance(
load, Load
), f"load must be a Load instance, got the {load} {type(Load)}"
return load
[docs] def decalibrate(self, temp: np.ndarray, load: Load | str, freq: np.ndarray = None):
"""
Decalibrate a temperature spectrum, yielding uncalibrated T*.
Parameters
----------
temp : array_like
A temperature spectrum, with the same length as `freq.freq`.
load : str or :class:`Load`
The load to calibrate.
freq : array-like
The frequencies at which to decalibrate. By default, the frequencies of the
instance.
Returns
-------
array_like : T*, the normalised uncalibrated temperature.
"""
if freq is None:
freq = self.freq.freq
if freq.min() < self.freq.freq.min():
warnings.warn(
"The minimum frequency is outside the calibrated range "
f"({self.freq.freq.min()} - {self.freq.freq.max()} MHz)"
)
if freq.max() > self.freq.freq.max():
warnings.warn("The maximum frequency is outside the calibrated range ")
a, b = self.get_linear_coefficients(load)
return (temp - b) / a
[docs] def get_K(
self, freq: tp.FreqType | None = None
) -> dict[str, tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]]:
"""Get the source-S11-dependent factors of Monsalve (2017) Eq. 7."""
if freq is None:
freq = self.freq.freq
gamma_ants = self.s11_correction_models
else:
gamma_ants = {
name: source.s11_model(freq.to_value("MHz"))
for name, source in self.loads.items()
}
lna_s11 = self.receiver.s11_model(freq.to_value("MHz"))
return {
name: rcf.get_K(gamma_rec=lna_s11, gamma_ant=gamma_ant)
for name, gamma_ant in gamma_ants.items()
}
[docs] def plot_calibrated_temp(
self,
load: Load | str,
bins: int = 2,
fig=None,
ax=None,
xlabel=True,
ylabel=True,
label: str = "",
as_residuals: bool = False,
load_in_title: bool = False,
rms_in_label: bool = True,
):
"""
Make a plot of calibrated temperature for a given source.
Parameters
----------
load : :class:`~LoadSpectrum` instance
Source to plot.
bins : int
Number of bins to smooth over (std of Gaussian kernel)
fig : Figure
Optionally provide a matplotlib figure to add to.
ax : Axis
Optionally provide a matplotlib Axis to add to.
xlabel : bool
Whether to write the x-axis label
ylabel : bool
Whether to write the y-axis label
Returns
-------
fig :
The matplotlib figure that was created.
"""
load = self._load_str_to_load(load)
if fig is None and ax is None:
fig, ax = plt.subplots(1, 1, facecolor="w")
# binning
temp_calibrated = self.calibrate(load)
if bins > 0:
freq_ave_cal = bin_array(temp_calibrated, size=bins)
f = bin_array(self.freq.freq.to_value("MHz"), size=bins)
else:
freq_ave_cal = temp_calibrated
f = self.freq.freq.to_value("MHz")
freq_ave_cal[np.isinf(freq_ave_cal)] = np.nan
rms = np.sqrt(np.mean((freq_ave_cal - np.mean(freq_ave_cal)) ** 2))
ax.plot(
f,
freq_ave_cal,
label=f"Calibrated {load.load_name} [RMS = {rms:.3f}]",
)
temp_ave = self.source_thermistor_temps.get(load.load_name, load.temp_ave)
ax.plot(
self.freq.freq,
temp_ave,
color="C2",
label="Average thermistor temp",
)
ax.set_ylim([np.nanmin(freq_ave_cal), np.nanmax(freq_ave_cal)])
if xlabel:
ax.set_xlabel("Frequency [MHz]")
if ylabel:
ax.set_ylabel("Temperature [K]")
plt.ticklabel_format(useOffset=False)
ax.grid()
ax.legend()
return plt.gcf()
[docs] def get_load_residuals(self):
"""Get residuals of the calibrated temperature for a each load."""
return {
name: self.calibrate(load) - load.temp_ave
for name, load in self.loads.items()
}
[docs] def get_rms(self, smooth: int = 4):
"""Return a dict of RMS values for each source.
Parameters
----------
smooth : int
The number of bins over which to smooth residuals before taking the RMS.
"""
resids = self.get_load_residuals()
out = {}
for name, res in resids.items():
if smooth > 1:
res = convolve(res, Gaussian1DKernel(stddev=smooth), boundary="extend")
out[name] = np.sqrt(np.nanmean(res**2))
return out
[docs] def plot_calibrated_temps(self, bins=64, fig=None, ax=None, **kwargs):
"""
Plot all calibrated temperatures in a single figure.
Parameters
----------
bins : int
Number of bins in the smoothed spectrum
Returns
-------
fig :
Matplotlib figure that was created.
"""
if fig is None or ax is None or len(ax) != len(self.loads):
fig, ax = plt.subplots(
len(self.loads),
1,
sharex=True,
gridspec_kw={"hspace": 0.05},
figsize=(10, 12),
)
for i, source in enumerate(self.loads):
self.plot_calibrated_temp(
source,
bins=bins,
fig=fig,
ax=ax[i],
xlabel=i == (len(self.loads) - 1),
)
fig.suptitle("Calibrated Temperatures for Calibration Sources", fontsize=15)
return fig
[docs] def plot_coefficients(self, fig=None, ax=None):
"""
Make a plot of the calibration models, C1, C2, Tunc, Tcos and Tsin.
Parameters
----------
fig : Figure
Optionally pass a matplotlib figure to add to.
ax : Axis
Optionally pass a matplotlib axis to pass to. Must have 5 axes.
"""
if fig is None or ax is None:
fig, ax = plt.subplots(
5, 1, facecolor="w", gridspec_kw={"hspace": 0.05}, figsize=(10, 9)
)
labels = [
"Scale ($C_1$)",
"Offset ($C_2$) [K]",
r"$T_{\rm unc}$ [K]",
r"$T_{\rm cos}$ [K]",
r"$T_{\rm sin}$ [K]",
]
for i, (kind, label) in enumerate(
zip(["C1", "C2", "Tunc", "Tcos", "Tsin"], labels)
):
ax[i].plot(self.freq.freq, getattr(self, kind)())
ax[i].set_ylabel(label, fontsize=13)
ax[i].grid()
plt.ticklabel_format(useOffset=False)
if i == 4:
ax[i].set_xlabel("Frequency [MHz]", fontsize=13)
fig.suptitle("Calibration Parameters", fontsize=15)
return fig
[docs] def clone(self, **kwargs):
"""Clone the instance, updating some parameters.
Parameters
----------
kwargs :
All parameters to be updated.
"""
return attr.evolve(self, **kwargs)
[docs] def write(self, filename: str | Path):
"""
Write all information required to calibrate a new spectrum to file.
Parameters
----------
filename : path
The filename to write to.
"""
# TODO: this is *not* all the metadata available when using edges-io. We should
# build a better system of maintaining metadata in subclasses to be used here.
with h5py.File(filename, "w") as fl:
# Write attributes
fl.attrs["cterms"] = self.cterms
fl.attrs["wterms"] = self.wterms
fl.attrs["t_load"] = self.open.spectrum.t_load
fl.attrs["t_load_ns"] = self.open.spectrum.t_load_ns
fl["C1"] = self.C1_poly.coefficients
fl["C2"] = self.C2_poly.coefficients
fl["Tunc"] = self.Tunc_poly.coefficients
fl["Tcos"] = self.Tcos_poly.coefficients
fl["Tsin"] = self.Tsin_poly.coefficients
hickle.dump(self.freq, fl.create_group("frequencies"))
hickle.dump(
self.receiver,
fl.create_group("receiver_s11"),
)
hickle.dump(
self.internal_switch,
fl.create_group("internal_switch"),
)
hickle.dump(
self.metadata,
fl.create_group("metadata"),
)
[docs] def to_calibrator(self):
"""Directly create a :class:`Calibrator` object without writing to file."""
return Calibrator(
cterms=self.cterms,
wterms=self.wterms,
t_load=self.t_load,
t_load_ns=self.t_load_ns,
C1=self.C1_poly,
C2=self.C2_poly,
Tunc=self.Tunc_poly,
Tcos=self.Tcos_poly,
Tsin=self.Tsin_poly,
freq=self.freq,
receiver_s11=self.receiver.s11_model,
internal_switch=self.internal_switch,
metadata=self.metadata,
)
[docs] def inject(
self,
lna_s11: np.ndarray = None,
source_s11s: dict[str, np.ndarray] = None,
c1: np.ndarray = None,
c2: np.ndarray = None,
t_unc: np.ndarray = None,
t_cos: np.ndarray = None,
t_sin: np.ndarray = None,
averaged_spectra: dict[str, np.ndarray] = None,
thermistor_temp_ave: dict[str, np.ndarray] = None,
) -> CalibrationObservation:
"""Make a new :class:`CalibrationObservation` based on this, with injections.
Parameters
----------
lna_s11
The LNA S11 as a function of frequency to inject.
source_s11s
Dictionary of ``{source: S11}`` for each source to inject.
c1
Scaling parameter as a function of frequency to inject.
c2 : [type], optional
Offset parameter to inject as a function of frequency.
t_unc
Uncorrelated temperature to inject (as function of frequency)
t_cos
Correlated temperature to inject (as function of frequency)
t_sin
Correlated temperature to inject (as function of frequency)
averaged_spectra
Dictionary of ``{source: spectrum}`` for each source to inject.
Returns
-------
:class:`CalibrationObservation`
A new observation object with the injected models.
"""
new = self.clone()
new._injected_lna_s11 = lna_s11
new._injected_source_s11s = source_s11s
new._injected_c1 = c1
new._injected_c2 = c2
new._injected_t_unc = t_unc
new._injected_t_cos = t_cos
new._injected_t_sin = t_sin
new._injected_averaged_spectra = averaged_spectra
new._injected_source_temps = thermistor_temp_ave
return new
[docs] @classmethod
def from_yaml(cls, config: tp.PathLike | dict, obs_path: tp.PathLike | None = None):
"""Create the calibration observation from a YAML configuration."""
if not isinstance(config, dict):
with open(config) as yml:
config = ayaml.load(yml)
iokw = config.pop("data", {})
if not obs_path:
obs_path = iokw.pop("path")
from_def = iokw.pop("compile_from_def", False)
if from_def:
io_obs = io.CalibrationObservation.from_def(obs_path, **iokw)
else:
io_obs = io.CalibrationObservation(obs_path, **iokw)
return cls.from_io(io_obs, **config)
[docs]@hickleable()
@attr.s(kw_only=True)
class Calibrator:
freq: FrequencyRange = attr.ib()
cterms: int = attr.ib()
wterms: int = attr.ib()
_C1: Callable[[np.ndarray], np.ndarray] = attr.ib()
_C2: Callable[[np.ndarray], np.ndarray] = attr.ib()
_Tunc: Callable[[np.ndarray], np.ndarray] = attr.ib()
_Tcos: Callable[[np.ndarray], np.ndarray] = attr.ib()
_Tsin: Callable[[np.ndarray], np.ndarray] = attr.ib()
_receiver_s11: Callable[[np.ndarray], np.ndarray] = attr.ib()
internal_switch = attr.ib()
t_load: float = attr.ib(300)
t_load_ns: float = attr.ib(350)
metadata: dict = attr.ib(default=attr.Factory(dict))
def __attrs_post_init__(self):
"""Initialize properties of the class."""
for key in ["C1", "C2", "Tunc", "Tcos", "Tsin"]:
setattr(self, key, partial(self._call_func, key=key, norm=True))
for key in [
"receiver_s11",
]:
setattr(self, key, partial(self._call_func, key=key, norm=False))
[docs] def clone(self, **kwargs):
"""Clone the instance with new parameters."""
return attr.evolve(self, **kwargs)
@internal_switch.validator
def _isw_vld(self, att, val):
if isinstance(val, s11.InternalSwitch):
return
for key in ("s11", "s12", "s22"):
if not hasattr(val, f"{key}_model") or not callable(
getattr(val, f"{key}_model")
):
raise ValueError(f"internal_switch must provide {key}_model method")
def _call_func(self, freq: tp.FreqType | None = None, *, key=None, norm=False):
if freq is None:
freq = self.freq.freq
if not hasattr(freq, "unit"):
raise ValueError("freq must have units of frequency")
if norm:
freq = self.freq.normalize(freq)
else:
freq = freq.to_value("MHz")
return getattr(self, "_" + key)(freq)
[docs] @classmethod
def from_calobs_file(cls, path: tp.PathLike) -> Calibrator:
"""Generate from calobs file."""
calobs = hickle.load(path)
return calobs.to_calibrator()
[docs] @classmethod
def from_calfile(cls, path: tp.PathLike) -> Calibrator:
"""Generate from calfile."""
with h5py.File(path, "r") as fl:
cterms = fl.attrs["cterms"]
wterms = fl.attrs["wterms"]
t_load = fl.attrs["t_load"]
t_load_ns = fl.attrs["t_load_ns"]
C1 = np.poly1d(fl["C1"])
C2 = np.poly1d(fl["C2"])
Tunc = np.poly1d(fl["Tunc"])
Tcos = np.poly1d(fl["Tcos"])
Tsin = np.poly1d(fl["Tsin"])
freq = hickle.load(fl["frequencies"])
receiver_s11 = hickle.load(fl["receiver_s11"])
internal_switch = hickle.load(fl["internal_switch"])
metadata = hickle.load(fl["metadata"])
return cls(
cterms=cterms,
wterms=wterms,
t_load=t_load,
t_load_ns=t_load_ns,
C1=C1,
C2=C2,
Tunc=Tunc,
Tcos=Tcos,
Tsin=Tsin,
freq=freq,
receiver_s11=receiver_s11.s11_model,
internal_switch=internal_switch,
metadata=metadata,
)
[docs] @classmethod
def from_old_calfile(cls, path: tp.PathLike) -> Calibrator:
"""Read from older calfiles."""
with h5py.File(path, "r") as fl:
cterms = int(fl.attrs["cterms"])
wterms = int(fl.attrs["wterms"])
t_load = fl.attrs.get("t_load", 300)
t_load_ns = fl.attrs.get("t_load_ns", 400)
C1_poly = np.poly1d(fl["C1"][...])
C2_poly = np.poly1d(fl["C2"][...])
Tcos_poly = np.poly1d(fl["Tcos"][...])
Tsin_poly = np.poly1d(fl["Tsin"][...])
Tunc_poly = np.poly1d(fl["Tunc"][...])
freq = FrequencyRange(fl["frequencies"][...] * un.MHz)
try:
metadata = dict(fl["metadata"].attrs)
except KeyError:
# For backwards compat
metadata = {}
_lna_s11_rl = Spline(freq.freq.to_value("MHz"), fl["lna_s11_real"][...])
_lna_s11_im = Spline(freq.freq.to_value("MHz"), fl["lna_s11_imag"][...])
_intsw_s11_rl = Spline(freq.freq, fl["internal_switch_s11_real"][...])
_intsw_s11_im = Spline(freq.freq, fl["internal_switch_s11_imag"][...])
_intsw_s12_rl = Spline(freq.freq, fl["internal_switch_s12_real"][...])
_intsw_s12_im = Spline(freq.freq, fl["internal_switch_s12_imag"][...])
_intsw_s22_rl = Spline(freq.freq, fl["internal_switch_s22_real"][...])
_intsw_s22_im = Spline(freq.freq, fl["internal_switch_s22_imag"][...])
internal_switch = SimpleNamespace(
s11_model=lambda freq: _intsw_s11_rl(freq) + _intsw_s11_im(freq) * 1j,
s12_model=lambda freq: _intsw_s12_rl(freq) + _intsw_s12_im(freq) * 1j,
s22_model=lambda freq: _intsw_s22_rl(freq) + _intsw_s22_im(freq) * 1j,
)
return cls(
C1=C1_poly,
C2=C2_poly,
Tunc=Tunc_poly,
Tcos=Tcos_poly,
Tsin=Tsin_poly,
freq=freq,
receiver_s11=lambda x: _lna_s11_rl(x) + 1j * _lna_s11_im(x),
internal_switch=internal_switch,
t_load=t_load,
t_load_ns=t_load_ns,
cterms=cterms,
wterms=wterms,
metadata=metadata,
)
[docs] @classmethod
def from_calobs(cls, calobs: CalibrationObservation) -> Calibrator:
"""Generate a :class:`Calibration` from an in-memory observation."""
return calobs.to_calibrator()
def _linear_coefficients(self, freq, ant_s11):
return rcf.get_linear_coefficients(
ant_s11,
self.receiver_s11(freq),
self.C1(freq),
self.C2(freq),
self.Tunc(freq),
self.Tcos(freq),
self.Tsin(freq),
self.t_load,
)
[docs] def calibrate_temp(self, freq: np.ndarray, temp: np.ndarray, ant_s11: np.ndarray):
"""
Calibrate given uncalibrated spectrum.
Parameters
----------
freq : np.ndarray
The frequencies at which to calibrate
temp : np.ndarray
The temperatures to calibrate (in K).
ant_s11 : np.ndarray
The antenna S11 for the load.
Returns
-------
temp : np.ndarray
The calibrated temperature.
"""
a, b = self._linear_coefficients(freq, ant_s11)
return temp * a + b
[docs] def decalibrate_temp(self, freq, temp, ant_s11):
"""
De-calibrate given calibrated spectrum.
Parameters
----------
freq : np.ndarray
The frequencies at which to calibrate
temp : np.ndarray
The temperatures to calibrate (in K).
ant_s11 : np.ndarray
The antenna S11 for the load.
Returns
-------
temp : np.ndarray
The calibrated temperature.
Notes
-----
Using this and then :method:`calibrate_temp` immediately should be an identity
operation.
"""
a, b = self._linear_coefficients(freq, ant_s11)
return (temp - b) / a
[docs] def calibrate_Q(
self, freq: np.ndarray, q: np.ndarray, ant_s11: np.ndarray
) -> np.ndarray:
"""
Calibrate given power ratio spectrum.
Parameters
----------
freq : np.ndarray
The frequencies at which to calibrate
q : np.ndarray
The power ratio to calibrate.
ant_s11 : np.ndarray
The antenna S11 for the load.
Returns
-------
temp : np.ndarray
The calibrated temperature.
"""
uncal_temp = self.t_load_ns * q + self.t_load
return self.calibrate_temp(freq, uncal_temp, ant_s11)