"""
******************************
Support to Time-Frequency data
******************************
This module contains all the relevant functions to read
and process the NenuFAR beamformed time-frequency data.
The functions called by the various :class:`~nenupy.io.tf.TFTask`
are listed here.
This module's content is not meant to be used outside of this
scope unless the user knows what they are doing.
.. seealso::
:ref:`tf_reading_doc`
"""
import numpy as np
import os
import dask.array as da
from dask.diagnostics import ProgressBar
import astropy.units as u
from astropy.coordinates import SkyCoord
from astropy.time import Time, TimeDelta
from typing import Union, List, Tuple, Any
from functools import partial
from abc import ABC, abstractmethod
import copy
import h5py
import logging
log = logging.getLogger(__name__)
from nenupy.astro import dispersion_delay, faraday_angle, parallactic_angle
from nenupy.astro.beam_correction import compute_jones_matrices, compute_projection_corrections
__all__ = [
"apply_dreambeam_corrections",
"blocks_to_tf_data",
"compute_spectra_frequencies",
"compute_spectra_time",
"compute_stokes_parameters",
"correct_bandpass",
"correct_parallactic",
"crop_subband_edges",
"de_disperse_array",
"de_faraday_data",
"flatten_subband",
"get_bandpass",
"mitigate_rfi_along_axis",
"plot_dynamic_spectrum",
"plot_lightcurve",
"plot_spectrum",
"polarization_angle",
"rebin_along_dimension",
"remove_channels_per_subband",
"reshape_to_subbands",
"sort_beam_edges",
"spectra_data_to_matrix",
"store_dask_tf_data",
"TFPipelineParameters",
"ReducedSpectraSlice",
"ReducedSpectra",
]
# ============================================================= #
# ---------------- apply_dreambeam_corrections ---------------- #
[docs]
def apply_dreambeam_corrections(
time_unix: np.ndarray,
frequency_hz: np.ndarray,
data: np.ndarray,
dt_sec: float,
time_step_sec: float,
n_channels: int,
skycoord: SkyCoord,
parallactic: bool = True,
) -> np.ndarray:
r"""
Correct for polarization systematics using DreamBeam.
The module `DreamBeam <https://github.com/2baOrNot2ba/dreamBeam>`_ is required for this function.
It generates the Jones matrices :math:`\mathbf{J}` affecting the light received by NenuFAR's :math:`X` and :math:`Y` dipoles, taking into account projection, parallactic angle and beam effects.
.. math::
\mathbf{d}_{\rm corr}(t, \nu) =
\mathbf{J}(t, \nu)
\begin{pmatrix}
X(t, \nu)\overline{X}(t, \nu) & X(t, \nu)\overline{Y}(t, \nu)\\
Y(t, \nu)\overline{X}(t, \nu) & Y(t, \nu)\overline{Y}(t, \nu)
\end{pmatrix}
\overline{\mathbf{J}^\top}(t, \nu)
Parameters
----------
time_unix : :class:`~numpy.ndarray`
Time axis of size :math:`n_t` corresponding to the first dimension of ``data`` in unix format.
frequency_hz : :class:`~numpy.ndarray`
Frequency axis of size :math:`n_{\nu}` corresponding to the second dimension of ``data`` in Hz.
data : :class:`~numpy.ndarray`
Data to be corrected, shaped as :math:`(n_t, n_{\nu}, 2, 2)`, where the two by two matrices are :math:`((X\overline{X}\ X\overline{Y}), (Y\overline{X}\ Y\overline{Y}))`.
dt_sec : `float`
Time resolution of the ``data`` in seconds.
time_step_sec : `float`
Time resolution at which the Jones matrices are computed. Unless required, it is typiclaly sufficient to request a solution every few seconds.
n_channels : `int`
Number of channnels per subband. It is expected that :math:`n_{\nu} = n_c \times n_{\rm sb}`.
skycoord : :class:`~astropy.coordinates.SkyCoord`
Sky coordinates of the target tracked during the observation.
parallactic : `bool`, optional
If set to ``True``, the Jones matrices take into account parallactic angle rotation effects (see for example :func:`~nenupy.io.tf_utils.correct_parallactic`), by default `True`
Returns
-------
:class:`~numpy.ndarray`
The corrected data :math:`\mathbf{d}_{\rm corr}(t, \nu)` shaped as as :math:`(n_t, n_{\nu}, 2, 2)`.
Raises
------
ValueError
If ``time_unix`` and/or ``frequency_hz`` do not match first and second dimensions of ``data``.
Or if ``frequency_hz``'s size is not divisible by ``n_channels``.
See Also
--------
:func:`~nenupy.astro.beam_correction.compute_jones_matrices`, :meth:`~nenupy.io.tf.TFTask.correct_polarization`
"""
log.info("Applying DreamBeam corrections...")
# Basic checks to make sure the dimensions are correct
freq_size = frequency_hz.size
time_size = time_unix.size
if time_size != data.shape[0]:
raise ValueError("There is a problem in the time dimension!")
if (freq_size != data.shape[1]) or (freq_size % n_channels != 0):
raise ValueError("There is a problem in the frequency dimension!")
n_subbands = int(freq_size / n_channels)
# Compute the number of time samples that will be corrected together
time_group_size = int(np.round(time_step_sec / dt_sec))
log.debug(
f"\tGroups of {time_group_size} time blocks will be corrected altogether ({dt_sec*time_group_size} sec resolution)."
)
n_time_groups = time_size // time_group_size
leftover_time_samples = time_size % time_group_size
if n_time_groups == 0:
log.warning(
f"The selected time interval for Jones solutions ({time_step_sec}) is greater than the selected time window ({Time(time_unix[0], format='unix').isot} to {Time(time_unix[-1], format='unix').isot}). "
"A single solution will be applied."
)
n_time_groups = 1
time_group_size = time_size
leftover_time_samples = 0
# Computing DreamBeam matrices
db_time, db_frequency, db_jones = compute_jones_matrices(
start_time=Time(time_unix[0], format="unix", precision=7),
time_step=TimeDelta(time_group_size * dt_sec, format="sec"),
duration=TimeDelta(time_unix[-1] - time_unix[0], format="sec"),
skycoord=skycoord,
parallactic=parallactic,
)
# db_time, db_frequency, db_jones = compute_projection_corrections(
# start_time=Time(time_unix[0], format="unix", precision=7),
# time_step=TimeDelta(time_group_size * dt_sec, format="sec"),
# duration=TimeDelta(time_unix[-1] - time_unix[0], format="sec"),
# skycoord=skycoord,
# parallactic=parallactic,
# )
db_time = db_time.unix
db_frequency = db_frequency.to_value(u.Hz)
db_jones = np.swapaxes(db_jones, 0, 1) # swap frequency and time axes
# Reshape the data at the time and frequency resolutions
# Take into account leftover times
if leftover_time_samples != 0:
data_leftover = data[-leftover_time_samples:, ...].reshape(
(leftover_time_samples, n_subbands, n_channels, 2, 2)
)
data = data[: time_size - leftover_time_samples, ...].reshape(
(n_time_groups, time_group_size, n_subbands, n_channels, 2, 2)
)
# Compute the frequency indices to select the corresponding Jones matrices
subband_start_frequencies = frequency_hz.reshape((n_subbands, n_channels))[:, 0]
freq_start_idx = np.argmax(db_frequency >= subband_start_frequencies[0])
freq_stop_idx = db_frequency.size - np.argmax(
db_frequency[::-1] < subband_start_frequencies[-1]
)
# Do the same with the time
group_start_time = time_unix[: time_size - leftover_time_samples].reshape(
(n_time_groups, time_group_size)
)[:, 0]
time_start_idx = np.argmax(db_time >= group_start_time[0])
# time_stop_idx = db_time.size - np.argmax(db_time[::-1] < group_start_time[-1])
time_stop_idx = time_start_idx + n_time_groups - 1
jones = db_jones[
time_start_idx : time_stop_idx + 1, freq_start_idx : freq_stop_idx + 1, :, :
][:, None, :, None, :, :]
if leftover_time_samples != 0:
jones_leftover = db_jones[-1, freq_start_idx : freq_stop_idx + 1, :, :][
None, :, None, :, :
]
# Invert the matrices that will be used to correct the observed signals
# Jones matrices are at the subband resolution and an arbitrary time resolution
jones = np.linalg.inv(jones)
if leftover_time_samples != 0:
try:
jones_leftover = np.linalg.inv(jones_leftover)
except:
log.warning("Set leftover dreambeam matrix to identity.")
jones_leftover[...] = np.broadcast_to(np.identity(2)[None, None, None, ...], jones_leftover.shape)
# Compute the Hermitian matrices
jones_transpose = np.swapaxes(jones, -2, -1)
jones_hermitian = np.conjugate(jones_transpose)
if leftover_time_samples != 0:
jones_leftover_transpose = np.swapaxes(jones_leftover, -2, -1)
jones_leftover_hermitian = np.conjugate(jones_leftover_transpose)
# This would raise an indexerror if jones_values are at smaller t/f range than data
if leftover_time_samples != 0:
return np.concatenate(
(
np.matmul(jones, np.matmul(data, jones_hermitian)).reshape(
(time_size - leftover_time_samples, freq_size, 2, 2)
),
np.matmul(
jones_leftover, np.matmul(data_leftover, jones_leftover_hermitian)
).reshape((leftover_time_samples, freq_size, 2, 2)),
),
axis=0,
)
else:
return np.matmul(
jones, np.matmul(data, jones_hermitian)
).reshape(
(time_size - leftover_time_samples, freq_size, 2, 2)
)
# ============================================================= #
# ------------- bandpass_correction_coefficients -------------- #
[docs]
def bandpass_correction_coefficients(frequency: u.Quantity, lna_filter: int) -> Tuple[np.ndarray, np.ndarray]:
r"""Compute the coefficients to modify the shape of the bandpass response.
The bandpass response (see :func:`~nenupy.io.tf_utils.get_bandpass`) has been computed for NenuFAR.
It has been noted that the real subband shapes differ from the theoretical model.
This deformation is chromatic and depends on the selected high-pass filter.
As a study realized on April 2026, the subbands are assumed to be deformed as :math:`{\rm slope} \times {\rm bandpass} + {\rm yintercept}`.
Coefficients :math:`{\rm slope} (\nu, {\rm filter})` and :math:`{\rm yintercept} (\nu, {\rm filter})` were derived from real data.
Parameters
----------
frequency : :class:`~astropy.units.Quantity`
Frequency at which the coefficients are computed. The lower the frequency, the higher the deformation.
lna_filter : `int`
NenuFAR high-pass filter used during the observation.
The higher the high-pass filter, the stronger the shape deformation.
Returns
-------
[:class:`~numpy.ndarray`, :class:`~numpy.ndarray`]
:math:`{\rm slope}` and :math:`{\rm yintercept}` for the selected ``frequency`` and ``lna_filter``.
Raises
------
ValueError
If the selected filter is different than 0, 1, 2 or 3 (corresponding to a 10, 15, 20, 25 MHz high-pass filter).
"""
if lna_filter not in [0, 1, 2, 3]:
raise ValueError(
"LNA filter should either be 0, 1, 2, or 3 "
"(respectiveley corresponding to a 10, 15, 20, 25 MHz high-pass filter.)."
)
# The sigmoid function represents the shape of both the slope and y-intercept curves vs. frequency
def sigmoid(xvals, a, b, c, k):
return b / (1 + np.exp(k * (xvals - a))) + c
def constant(xvals, a):
return np.ones(xvals.size) * a
def non_constant_sigmoid(xvals, a, b, c, k, d):
return b / (1 + np.exp(k * (xvals - a))) + c + d * xvals
def to_fit_sig_sig(xvals, threshold, a, b, c, k, a2, b2, c2, k2, d2):
result = np.ones(xvals.size)
low_f = xvals < threshold
high_f = xvals >= threshold
result[low_f] = sigmoid(xvals[low_f], a, b, c, k)
result[high_f] = non_constant_sigmoid(xvals[high_f], a2, b2, c2, k2, d2)
return result
def to_fit_const_sig(xvals, threshold, a, a2, b2, c2, k2, d2):
result = np.ones(xvals.size)
low_f = xvals < threshold
high_f = xvals >= threshold
result[low_f] = constant(xvals[low_f], a)
result[high_f] = non_constant_sigmoid(xvals[high_f], a2, b2, c2, k2, d2)
return result
correction_function = to_fit_sig_sig if lna_filter >= 2 else to_fit_const_sig
# The function coefficients were fitted on real data.
# fitted_parameters_slope_filter_3 = (17.92080798, 0.6247039, 0.41066223, -0.93380174)
# fitted_parameters_yintercept_filter_3 = (1.79374568e+01, 5.99856932e-01, 9.85121646e-03, 9.32503789e-01)
coefficients = {
"slope": {
0: (5.00000000e+01, 1.03693912e+00, 8.89487679e+01, 1.02863838e+00, 9.09061184e-02, 3.81796685e-01, -1.62453382e-03),
1: (5.00000000e+01, 1.03621147e+00, 8.64336414e+01, 5.27969788e-01, 5.95808501e-01, 4.22370139e-01, -1.70520031e-03),
2: (5.00000000e+01, 1.30641911e+01, 5.84742703e-01, 4.51767374e-01, -1.02227093e+00, 8.88508007e+01, 1.06722835e+00, 5.50610398e-02, 3.92651498e-01, -1.66074012e-03),
3: (5.00000000e+01, 1.79766585e+01, 6.26219238e-01, 4.09338837e-01, -9.41265763e-01, 8.67626131e+01, 5.83615866e-01, 5.40245636e-01, 4.15807316e-01, -1.73087766e-03)
},
"yintercept": {
0: (5.00000000e+01, 8.23885830e-03, 8.84722442e+01, -8.75842286e-01, 8.03931674e-01, 3.88727164e-01, 1.57796172e-03),
1: (5.00000000e+01, 8.91418553e-03, 8.62446152e+01, -4.87074122e-01, 4.11469802e-01, 4.27254949e-01, 1.64909055e-03),
2: (5.00000000e+01, 1.30006330e+01, 5.51584657e-01, 8.78717949e-03, 1.06913377e+00, 8.82938870e+01, -8.81479279e-01, 8.07125303e-01, 4.00112386e-01, 1.60945820e-03),
3: (5.00000000e+01, 1.79888717e+01, 6.01638449e-01, 9.68806278e-03, 9.37583260e-01, 8.64853961e+01, -5.25922533e-01, 4.49999393e-01, 4.22993203e-01, 1.67795904e-03)
}
}
slope_coefficients = correction_function(frequency.to_value(u.MHz), *coefficients["slope"][lna_filter])
yintercept_coefficients = correction_function(frequency.to_value(u.MHz), *coefficients["yintercept"][lna_filter])
return slope_coefficients, yintercept_coefficients
# ============================================================= #
# --------------------- blocks_to_tf_data --------------------- #
[docs]
def blocks_to_tf_data(data: da.Array, n_block_times: int, n_channels: int) -> da.Array:
"""Parse time-frequency data at the reading of a .spectra file.
Invert the halves of each beamlet and reshape the
array in 2D (time, frequency) or 4D (time, frequency, 2, 2).
Parameters
----------
data : :class:`~dask.array.Array`
Raw data. Number of dimensions should either be 4 (n_block, n_subband, n_time_per_block, n_channels) or 6 (n_block, n_subband, n_time_per_block, n_channels, 2, 2)
n_block_times : `int`
Number of time blocks
n_channels : `int`
Number of frequency channels in each beamlet
Returns
-------
:class:`~dask.array.Array`
Data reshaped
Raises
------
ValueError
Raised if n_channels is odd
IndexError
Raised if the number of dimensions of data is different than 4 or 6
Warning
-------
Usage only within :class:`~nenupy.io.tf.Spectra`.
Example
-------
.. code-block:: python
>>> from nenupy.io.tf_utils import blocks_to_tf_data
>>> import numpy as np
>>> n_block = 5
>>> n_subband = 32
>>> n_time_per_block = 24
>>> n_channels = 8
>>> data = np.ones(((n_block, n_subband, n_time_per_block, n_channels, 2, 2)))
>>> data_reshaped = blocks_to_tf_data(
data=data,
n_block_times=n_time_per_block,
n_channels=n_channels
)
>>> data_reshaped.shape
(120, 256, 2, 2)
"""
ntb, nfb = data.shape[:2]
n_times = ntb * n_block_times
n_freqs = nfb * n_channels
if n_channels % 2.0 != 0.0:
raise ValueError(f"Problem with n_channels value: {n_channels}!")
# Prepare the various shapes
temp_shape = (n_times, int(n_freqs / n_channels), 2, int(n_channels / 2))
final_shape = (n_times, n_freqs)
# Add dimension if this comes in eJones matrix
if data.ndim == 4:
pass
elif data.ndim == 6:
final_shape += (2, 2)
temp_shape += (2, 2)
else:
raise IndexError(f"The data has an unexpected shape of {data.shape}.")
# Swap the subband (1) and nffte (2) dimensions to group
# frequency and times dimensions together.
# Reshape in order to isolate the halves of every beamlet.
data = np.swapaxes(data, 1, 2).reshape(temp_shape)
# Invert the halves and reshape to the final form.
return data[:, :, ::-1, ...].reshape(final_shape)
# ============================================================= #
# ---------------- compute_spectra_frequencies ---------------- #
[docs]
def compute_spectra_frequencies(
subband_start_hz: np.ndarray, n_channels: int, frequency_step_hz: float
) -> da.Array:
"""Compute the frequency axis of a time-frequency file.
Re-construct the whole frequency range, knowing the starting frequency of
each sub-band, the number of channels per sub-band and the frequency
resolution.
Parameters
----------
subband_start_hz : :class:`~numpy.ndarray`
Array of sub-band starting frequencies
n_channels : `int`
Number of channels per subband
frequency_step_hz : `float`
Frequency resolution in Hz
Returns
-------
:class:`~dask.array.Array`
The frequency array (`Dask <https://docs.dask.org/en/stable/>`_ format), in Hz
Example
-------
.. code-block:: python
:emphasize-lines: 7,8,9,10,11
>>> from nenupy.io.tf_utils import compute_spectra_frequencies
>>> import astropy.units as u
>>> sb_start_freq = [50.1953125, 50.390625, 50.5859375, 50.781250] * u.MHz
>>> n_channels = 16
>>> df = 12.20703125 * u.kHz
>>> freq_axis = compute_spectra_frequencies(
subband_start_hz=sb_start_freq.to_value(u.Hz),
n_channels=n_channels,
frequency_step_hz=df.to_value(u.Hz)
)
>>> freq_axis.compute()
array([50097656.25, 50109863.28125,....50854492.1875, 50866699.21875])
"""
# Construct the frequency array
frequencies = da.tile(np.arange(n_channels) - n_channels / 2, subband_start_hz.size)
frequencies = frequencies.reshape((subband_start_hz.size, n_channels))
frequencies *= frequency_step_hz
frequencies += subband_start_hz[:, None]
frequencies = frequencies.ravel()
log.debug(f"\tFrequency axis computed (size={frequencies.size}).")
return frequencies
# ============================================================= #
# ------------------- compute_spectra_time -------------------- #
[docs]
def compute_spectra_time(
block_start_time_unix: np.ndarray, ntime_per_block: int, time_step_s: float
) -> da.Array:
"""Compute the time axis of a time-frequency file.
Re-construct the whole time range, knowing the starting unix time of
each time block, the number of time samples per block and the time
resolution.
Parameters
----------
block_start_time_unix : :class:`~numpy.ndarray`
Array of start times of each block, in UNIX format
ntime_per_block : `int`
Number of time samples per block
time_step_s : `float`
Time resolution in seconds
Returns
-------
:class:`~dask.array.Array`
The time array (`Dask <https://docs.dask.org/en/stable/>`_ format), in unix
Example
-------
.. code-block:: python
:emphasize-lines: 14,15,16,17,18
>>> from nenupy.io.tf_utils import compute_spectra_time
>>> from astropy.time import Time
>>> import astropy.units as u
>>> nffte = 42
>>> dt = 0.02097152 * u.s
>>> start_times = Time([
'2024-07-15T08:31:12.000', '2024-07-15T08:31:12.881',
'2024-07-15T08:31:13.762', '2024-07-15T08:31:14.642',
'2024-07-15T08:31:15.523', '2024-07-15T08:31:16.404',
'2024-07-15T08:31:17.285', '2024-07-15T08:31:18.166',
'2024-07-15T08:31:19.046', '2024-07-15T08:31:19.927'
])
>>> time_axis = compute_spectra_time(
block_start_time_unix=start_times.unix,
ntime_per_block=nffte,
time_step_s=dt.to_value(u.s)
)
>>> time_axis.compute()
array([1721032272.0, 1721032272.0209715, ...])
"""
# Construct the elapsed time per block (1D array)
time_seconds_per_block = da.arange(ntime_per_block, dtype="float64") * time_step_s
# Create the time ramp with broadcasting
unix_time = time_seconds_per_block[None, :] + block_start_time_unix[:, None]
# Return the flatten array
unix_time = unix_time.ravel()
log.debug(f"\tTime axis computed (size={unix_time.size}).")
return unix_time
# ============================================================= #
# ----------------- compute_stokes_parameters ----------------- #
[docs]
def compute_stokes_parameters(
data_array: np.ndarray, stokes: Union[List[str], str]
) -> np.ndarray:
r"""Compute the Stokes parameters from data organized in Jones matrices.
``data_array``'s last two dimensions are assumed to be:
.. math::
\mathbf{d} = \begin{pmatrix}
X\overline{X} & X\overline{Y} \\
Y\overline{X} & Y\overline{Y}
\end{pmatrix},
so that the Stokes parameters are computed such as:
.. math::
\begin{align}
I &= \Re(X\overline{X}) + \Re(Y\overline{Y})\\
Q &= \Re(X\overline{X}) - \Re(Y\overline{Y})\\
U &= 2\Re(X\overline{Y})\\
V &= -2\Im(X\overline{Y})\\
L &= \sqrt{U^2 + Q^2}
\end{align}
Parameters
----------
data_array : :class:`~numpy.ndarray`
The data array (a Dask :class:`~dask.array.Array` is also accepted),
the first dimensions may corresponds with anything (for e.g., time,
frequency, beam...) and are kept through the computation, the last
two dimensions must be the :math:`2 \times 2` Jones matrices.
stokes : List[`str`] or `str`
Stokes parameters to compute, if a list is given, the result will
store them in the same order in the last result dimension, available
values are "I", "Q", "U", "V", "L", "Q/I", "U/I", "V/I", "L/I".
Returns
-------
:class:`~numpy.ndarray`
New data array transformed in Stokes parameters,
the last two dimensions are replaced by a single dimension
listing in order the requested stokes parameters.
If the input data_array is a Dask :class:`~dask.array.Array`,
the returned result is of the same type.
Raises
------
Exception
Raised if the last two dimensions of data_array are different than (2, 2)
NotImplementedError
Raised if the requested Stokes parameter is not known
Example
-------
.. code-block:: python
>>> from nenupy.io.tf import Spectra
>>> from nenupy.io.tf_utils import compute_stokes_parameters
>>> sp = Spectra(".../my_file.spectra")
>>> result = compute_stokes_parameters(
data_array=sp.data[:1000, :100, :, :], # shape: (time, frequency, 2, 2)
stokes=["I", "U/I", "V/I"]
)
>>> result.shape
(1000, 100, 3)
"""
log.info(f"Computing Stokes parameters {stokes}...")
# Assert that the last dimensions are shaped like a cross correlation electric field matrix
if data_array.shape[-2:] != (2, 2):
raise Exception("The data_array last 2 dimensions are not of shape (2, 2).")
result = None
if isinstance(stokes, str):
# Make sure the Stokes iterable is a list and not just the string.
stokes = [stokes]
for stokes_i in stokes:
stokes_i = stokes_i.replace(" ", "").upper()
# Compute the correct Stokes value
if stokes_i == "I":
data_i = data_array[..., 0, 0].real + data_array[..., 1, 1].real
elif stokes_i == "Q":
data_i = data_array[..., 0, 0].real - data_array[..., 1, 1].real
elif stokes_i == "U":
data_i = data_array[..., 0, 1].real * 2
elif stokes_i == "V":
data_i = - data_array[..., 0, 1].imag * 2 # no negative sign? because data_array[..., 0, 1] = [XrYr + XiYi ; XrYi - XiYr] which is the opposite of XY*=YrXr+YiXi + i(XiYr - XrYi)
elif stokes_i == "L":
data_i = np.sqrt( (data_array[..., 0, 0].real - data_array[..., 1, 1].real)**2 + (data_array[..., 0, 1].real * 2)**2 )
elif stokes_i == "Q/I":
data_i = (data_array[..., 0, 0].real - data_array[..., 1, 1].real) / (
data_array[..., 0, 0].real + data_array[..., 1, 1].real
)
elif stokes_i == "U/I":
data_i = (
data_array[..., 0, 1].real
* 2
/ (data_array[..., 0, 0].real + data_array[..., 1, 1].real)
)
elif stokes_i == "V/I":
data_i = (
- data_array[..., 0, 1].imag
* 2
/ (data_array[..., 0, 0].real + data_array[..., 1, 1].real)
)
elif stokes_i == "L/I":
data_i = np.sqrt( (data_array[..., 0, 0].real - data_array[..., 1, 1].real)**2 + (data_array[..., 0, 1].real * 2)**2 ) / (
data_array[..., 0, 0].real + data_array[..., 1, 1].real
)
else:
raise NotImplementedError(f"Stokes parameter {stokes_i} unknown.")
log.info(f"\tStokes {stokes_i} computed.")
# Stack everything
if result is None:
result = np.expand_dims(data_i, axis=-1)
else:
result = np.concatenate([result, data_i[..., None]], axis=-1)
return result
# ============================================================= #
# --------------------- correct_bandpass ---------------------- #
[docs]
def correct_bandpass(data: np.ndarray, n_channels: int, lna_filter: int = None, frequency: u.Quantity = None) -> np.ndarray:
"""Correct the Polyphase-filter band-pass response at each sub-band.
This methods computes the bandpass theoretical response of sub-bands
made of ``n_channels`` and multiply the ``data`` by this reponse.
Parameters
----------
data : :class:`~numpy.ndarray`
The 4D data array (a Dask :class:`~dask.array.Array` is also accepted).
The first two dimensions are assumed to be corresponding to time and frequencies respectively.
The two others are the Jones matrices (2, 2).
n_channels : `int``
The number of channels per subband.
The second dimension of ``data`` should equal to the product of the number of channels with the number of subbands.
lna_filter : `int`, optional
NenuFAR LNA filter at which the data were acquired.
Its value must be 0, 1, 2 or 3 (corresponding to 10, 15, 20, or 25 MHz high-pass filter respectively).
The bandpass response is modified to account for the selected filter (with :func:`~nenupy.io.tf_utils.bandpass_correction_coefficients`).
By default, `None` (i.e. the bandpass response is taken as the output of :func:`~nenupy.io.tf_utils.get_bandpass`).
frequency : :class:`~astropy.units.Quantity`, optional
The frequency axis (should be of the same size than ``data`` 2nd dimension).
If ``lna_filter`` is set to `None`, this argument has no effect.
By default, `None`.
Returns
-------
:class:`~numpy.ndarray`
Bandpass response corrected data.
Raises
------
ValueError
Raised if the shape of data does not match the number of channels.
See Also
--------
:func:`~nenupy.io.tf_utils.get_bandpass`
"""
log.info("Correcting for bandpass...")
# Compute the bandpass
bandpass = get_bandpass(n_channels=n_channels)
# Reshape the data array to isolate individual subbands
n_times, n_freqs, _, _ = data.shape
if n_freqs % n_channels != 0:
raise ValueError(
"The frequency dimension of `data` doesn't match the argument `n_channels`."
)
data = data.reshape(
(n_times, int(n_freqs / n_channels), n_channels, 2, 2) # subband # channels
)
# Multiply the channels by the bandpass to correct them
if lna_filter is None:
data *= bandpass[None, None, :, None, None]
else:
slope_coeff, yinter_coeff = bandpass_correction_coefficients(
frequency=frequency.reshape(int(n_freqs / n_channels), n_channels)[:, 0],
lna_filter=lna_filter
)
corrected_bandpass = slope_coeff[:, None] / bandpass[None, :] + yinter_coeff[:, None]
data /= corrected_bandpass[None, :, :, None, None]
log.debug(f"\tEach subband corrected by the bandpass of size {bandpass.size}.")
# Re-reshape the data into time, frequency, (2, 2) array
return data.reshape((n_times, n_freqs, 2, 2))
# ============================================================= #
# -------------------- correct_parallactic -------------------- #
[docs]
def correct_parallactic(
time_unix: np.ndarray,
frequency_hz: np.ndarray,
data: np.ndarray,
dt_sec: float,
time_step_sec: float,
n_channels: int,
skycoord: SkyCoord
) -> np.ndarray:
# Basic checks to make sure the dimensions are correct
freq_size = frequency_hz.size
time_size = time_unix.size
if time_size != data.shape[0]:
raise ValueError("There is a problem in the time dimension!")
if (freq_size != data.shape[1]) or (freq_size % n_channels != 0):
raise ValueError("There is a problem in the frequency dimension!")
n_subbands = int(freq_size / n_channels)
# Compute the number of time samples that will be corrected together
time_group_size = int(np.round(time_step_sec / dt_sec))
log.debug(
f"\tGroups of {time_group_size} time blocks will be corrected altogether ({dt_sec*time_group_size} sec resolution)."
)
n_time_groups = time_size // time_group_size
leftover_time_samples = time_size % time_group_size
if n_time_groups == 0:
log.warning(
f"The selected time interval for Jones solutions ({time_step_sec}) is greater than the selected time window ({Time(time_unix[0], format='unix').isot} to {Time(time_unix[-1], format='unix').isot}). "
"A single solution will be applied."
)
n_time_groups = 1
time_group_size = time_size
leftover_time_samples = 0
# Build the time array for the Jones solutions
start_time = Time(time_unix[0], format="unix", precision=7)
jones_times = start_time + np.arange(n_time_groups + 1) * TimeDelta(time_group_size * dt_sec, format="sec")
jones_unix = jones_times.unix
# Reshape the data at the time and frequency resolutions
# Take into account leftover times
if leftover_time_samples != 0:
data_leftover = data[-leftover_time_samples:, ...].reshape(
(leftover_time_samples, n_subbands, n_channels, 2, 2)
)
data = data[: time_size - leftover_time_samples, ...].reshape(
(n_time_groups, time_group_size, n_subbands, n_channels, 2, 2)
)
# Do the same with the time
group_start_time = time_unix[: time_size - leftover_time_samples].reshape(
(n_time_groups, time_group_size)
)[:, 0]
time_start_idx = np.argmax(jones_unix >= group_start_time[0])
time_stop_idx = time_start_idx + n_time_groups - 1
par_angle = parallactic_angle(coordinates=skycoord, time=jones_times).rad
jones_parallactic = np.array([
[np.cos(par_angle), - np.sin(par_angle)],
[np.sin(par_angle), np.cos(par_angle)]
])
jones_parallactic = np.swapaxes(jones_parallactic, 2, 0)
# jones_parallactic = np.linalg.inv(jones_parallactic)
jones = jones_parallactic[
time_start_idx : time_stop_idx + 1, :, :
][:, None, None, None, :, :]
jones_leftover = jones_parallactic[
-1, :, :
][None, None, None, :, :]
# Compute the Hermitian matrices
jones_transpose = np.swapaxes(jones, -2, -1)
jones_hermitian = np.conjugate(jones_transpose)
if leftover_time_samples != 0:
jones_leftover_transpose = np.swapaxes(jones_leftover, -2, -1)
jones_leftover_hermitian = np.conjugate(jones_leftover_transpose)
if leftover_time_samples != 0:
return np.concatenate(
(
np.matmul(jones, np.matmul(data, jones_hermitian)).reshape(
(time_size - leftover_time_samples, freq_size, 2, 2)
),
np.matmul(
jones_leftover, np.matmul(data_leftover, jones_leftover_hermitian)
).reshape((leftover_time_samples, freq_size, 2, 2)),
),
axis=0
)
else:
return np.matmul(
jones, np.matmul(data, jones_hermitian)
).reshape(
(time_size - leftover_time_samples, freq_size, 2, 2)
)
# ============================================================= #
# -------------------- crop_subband_edges --------------------- #
[docs]
def crop_subband_edges(
data: np.ndarray,
n_channels: int,
lower_edge_channels: int = 0,
higher_edge_channels: int = 0,
) -> np.ndarray:
"""Set edge channels of each subband to `NaN`.
Each subband of ``data`` is determined thanks to ``n_channels``
(and the function :func:`~nenupy.io.tf_utils.reshape_to_subbands`).
This method is a bit faster than :func:`~nenupy.io.tf_utils.remove_channels_per_subband`
but it is restricted to edge channels. The other method is prefered for its polyvalence.
Parameters
----------
data : :class:`~numpy.ndarray`
Data to be corrected, must be at least two-dimensional, the first two dimensions being respectively the time and the frequency
n_channels : `int`
Number of channels per subband
lower_edge_channels : `int`
Number of channels to set to `NaN` for the lowest part of each subband, by default 0
higher_edge_channels : `int`
Number of channels to set to `NaN` for the highest part of each subband, by default 0
Returns
-------
:class:`~numpy.ndarray`
Corrected data, same shape as input array
Raises
------
ValueError
Raised if the cropped channels are greater than ``n_channels``.
Examples
--------
.. code-block:: python
>>> from nenupy.io.tf_utils import crop_subband_edges
>>> import numpy as np
>>> result = crop_subband_edges(
data=np.ones((2, 10)),
n_channels=5,
lower_edge_channels=1, # set to NaN the first channel of each subband
higher_edge_channels=0
)
>>> print(result)
[[nan 1. 1. 1. 1. nan 1. 1. 1. 1.]
[nan 1. 1. 1. 1. nan 1. 1. 1. 1.]]
>>> result = crop_subband_edges(
data=np.ones((2, 10)),
n_channels=5,
lower_edge_channels=0,
higher_edge_channels=2 # set to NaN the last 2 channels of each subband
)
>>> print(result)
[[ 1. 1. 1. nan nan 1. 1. 1. nan nan]
[ 1. 1. 1. nan nan 1. 1. 1. nan nan]]
See Also
--------
:func:`~nenupy.io.tf_utils.remove_channels_per_subband`
"""
log.info("Removing edge channels...")
if lower_edge_channels + higher_edge_channels >= n_channels:
raise ValueError(
f"{lower_edge_channels + higher_edge_channels} channels to crop out of {n_channels} channels subbands."
)
original_shape = data.shape
n_times = original_shape[0]
n_freqs = original_shape[1]
data = reshape_to_subbands(data=data, n_channels=n_channels)
# Set to NaN edge channels
data[:, :, :lower_edge_channels, ...] = np.nan # lower edge
data[:, :, n_channels - higher_edge_channels :, ...] = np.nan # upper edge
data = data.reshape((n_times, n_freqs) + original_shape[2:])
log.info(
f"\t{lower_edge_channels} lower and {higher_edge_channels} higher "
"band channels have been set to NaN at the subband edges."
)
return data
# ============================================================= #
# --------------------- de_disperse_array --------------------- #
[docs]
def de_disperse_array(
data: np.ndarray,
frequencies: u.Quantity,
time_step: u.Quantity,
dispersion_measure: u.Quantity,
) -> np.ndarray:
"""De-disperse in time an array ``data`` whose first two
dimensions are time and frequency respectively. The array
must be regularly sampled in time. The de-dispersion is made
relatively to the highest frequency using :func:`~nenupy.astro.astro_tools.dispersion_delay`.
De-dedispersed array is filled with ``NaN`` in time-frequency places where the
shifted values were.
Parameters
----------
data : :class:`~numpy.ndarray`
Data array to de-disperse, its shape must be (time, frequency, (polarizations)).
frequencies : :class:`~astropy.units.Quantity`
1D array of frequencies corresponding to the second
dimension of ``data``.
time_step : :class:`~astropy.units.Quantity`
Time step between two spectra.
dispersion_measure : :class:`~astropy.units.Quantity`
Dispersion Measure (in pc/cm3).
Returns
-------
:class:`~numpy.ndarray`
De-dispersed data.
Raises
------
Exception
Raised if the data dimension is less than 2.
ValueError
Raised if the ``frequencies`` array does not match dimension 1 of ``data``.
Examples
--------
.. code-block:: python
:emphasize-lines: 7,26,27,28,29,30,31
>>> import numpy as np
>>> import matplotlib.pyplot as plt
>>> import astropy.units as u
>>> from astropy.time import Time, TimeDelta
>>> from nenupy.astro import dispersion_delay
>>> from nenupy.io.tf_utils import de_disperse_array
>>> n_times = 100
>>> n_freqs = 75
>>> n_pols = 1
>>> data_shape = (n_times, n_freqs, n_pols)
>>> # Build fake dispersed data
>>> dm = 5 * u.pc / (u.cm**3)
>>> dt = TimeDelta(1, format="sec")
>>> frequencies = np.linspace(20, 80, n_freqs) * u.MHz
>>> times = Time("2024-01-01 00:00:00") + np.arange(n_times) * dt
>>> delay = dispersion_delay(frequency=frequencies, dispersion_measure=dm)
>>> time_delay_idx = np.argmin(np.abs((times - times[0]).sec[:, None] - delay.to_value(u.s)[None, :]), axis=0)
>>> dispersed_data = np.ones((data_shape))
>>> dispersed_data[time_delay_idx + 5, np.arange(n_freqs)] += 10
>>> dispersed_data[time_delay_idx + 20, np.arange(n_freqs)] += 10
>>> # Correct the dispersion
>>> de_dispersed_data = de_disperse_array(
data=dispersed_data,
frequencies=frequencies,
time_step=dt,
dispersion_measure=dm
)
>>> # Plot the comparison
>>> fig = plt.figure(figsize=(10, 4))
>>> axes = fig.subplots(nrows=1, ncols=2)
>>> im_0 = axes[0].pcolormesh(times.datetime, frequencies.to_value(u.MHz), dispersed_data[:, :, 0].T)
>>> im_1 = axes[1].pcolormesh(times.datetime, frequencies.to_value(u.MHz), de_dispersed_data[:, :, 0].T)
>>> axes[0].set_ylabel("Frequency (MHz)")
>>> axes[0].set_xlabel("Time")
>>> axes[0].tick_params(axis="x", labelrotation=45)
>>> axes[1].set_xlabel("Time")
>>> axes[1].tick_params(axis="x", labelrotation=45)
.. figure:: ../_images/io_images/dedispersion.png
:width: 650
:align: center
See Also
--------
:meth:`~nenupy.io.tf.TFTask.de_disperse`, :func:`~nenupy.astro.astro_tools.dispersion_delay`
"""
log.info(f"De-dispersing data by DM={dispersion_measure.to(u.pc/u.cm**3)}...")
if data.ndim < 2:
raise Exception(
f"Input data is {data.shape}. >2D array is required "
"(time, frequency, ...)."
)
if data.shape[1] != frequencies.size:
raise ValueError(
f"The size of frequencies ({frequencies.size}) does "
f"not match dimension 1 of data ({data.shape[1]})."
)
# Compute the relative delays
delays = dispersion_delay(
frequency=frequencies, dispersion_measure=dispersion_measure
)
delays -= dispersion_delay(
frequency=frequencies.max(), dispersion_measure=dispersion_measure
)
# Convert the delays into indices
cell_delays = np.round((delays / time_step).decompose().to_value()).astype(int)
# Shift the array in time
# dedispersed_data = data.copy()
# time_size = data.shape[0]
# for i in range(frequencies.size):
# dedispersed_data[:, i, ...] = np.roll(data[:, i, ...], -cell_delays[i], 0)
# # Mask right edge of dynspec
# dedispersed_data[time_size - cell_delays[i] :, i, ...] = np.nan
n_times = data.shape[0]
tf_shape = data.shape[:2]
pol_shape = data.shape[2:]
indices_to_keep = np.hstack([tt + np.arange(n_times - tt) + (i * n_times) for i, tt in enumerate(cell_delays)])
new_indices = indices_to_keep - cell_delays[(indices_to_keep//n_times)]
dedispersed_data = np.empty(data.shape).reshape((np.prod(tf_shape),) + pol_shape)
dedispersed_data[...] = np.nan
dedispersed_data[new_indices, ...] = data.reshape((np.prod(tf_shape),) + pol_shape, order="F")[indices_to_keep, ...] # data.ravel(order="F")[indices_to_keep]
dedispersed_data = dedispersed_data.reshape(data.shape, order="F")
log.info("\tDone de-dispersing.")
return dedispersed_data
# ============================================================= #
# ---------------------- de_faraday_data ---------------------- #
[docs]
def de_faraday_data(
data: np.ndarray, frequency: u.Quantity, rotation_measure: u.Quantity
) -> np.ndarray:
r"""Correct the data from Faraday Rotation effect.
Linearly polarized light travelling through a magnetized plasma is subject to
a rotation of its polarization direction due to charged particles
(mostly dominated by electrons) reacting to and influencing
differentially the magnetic field components of the electromagnetic
radiation.
This function computes the chromatic Faraday rotation angle to correct for
:math:`\theta (\nu)`
with :func:`~nenupy.astro.astro_tools.faraday_angle`, given the
``rotation_measure`` of the intervening plasma. It is equivalent as
computing :math:`\Delta \theta = | \theta (\nu_{\rm ref}) - \theta (\nu) |`, i.e., the difference of rotation angle
where every ``frequency`` is compared to an 'infinite' frequency (since
:math:`\theta \propto \nu^{-2}`).
.. math::
\mathbf{d}_{\rm corrected}(t, \nu) =
\mathbf{R}(\nu)
\mathbf{d}(t, \nu)
\mathbf{R}(\nu)^\top,
where
.. math::
\mathbf{R}(\nu) = \begin{pmatrix}
\cos \theta (\nu) & - \sin \theta (\nu) \\
\sin \theta (\nu) & \cos \theta (\nu)
\end{pmatrix}
is the Faraday rotation matrix and
.. math::
\mathbf{d}(t, \nu) = \begin{pmatrix}
e_{\rm x}\overline{e_{\rm x}} & e_{\rm x}\overline{e_{\rm y}} \\
e_{\rm y}\overline{e_{\rm x}} & e_{\rm y}\overline{e_{\rm y}}
\end{pmatrix}(t, \nu),
and similarly :math:`\mathbf{d}_{\rm corrected}` are
the ``data`` and corrected data returned by this function, shaped as
Jones matrices of the signal. :math:`t` and :math:`\nu` are the time
and frequency.
Parameters
----------
frequency : :class:`~astropy.units.Quantity`
Light frequency (in units equivalent to MHz).
data : :class:`~numpy.ndarray`
Data in Jones format to be corrected. It needs to be shaped like (time, frequency, 2, 2)
rotation_measure : :class:`~astropy.units.Quantity`
Rotation measure (in units equivalent to rad/m2).
Returns
-------
:class:`~numpy.ndarray`
Faraday rotation corrected data.
Raises
------
Exception
Raised if the data dimensions do not meet the requirements.
Examples
--------
.. code-block:: python
>>> import numpy as np
>>> import astropy.units as u
>>> import matplotlib.pyplot as plt
>>> from nenupy.io.tf_utils import de_faraday_data
>>> times = np.arange(10)
>>> frequencies = np.linspace(52, 62, 100) * u.MHz
>>> linearly_polarized_light = 0.5 * np.array([[1, 1], [1, 1]]) # 45 deg
>>> linearly_polarized_light = np.tile(linearly_polarized_light, (times.size, frequencies.size, 1, 1))
>>> stokes_u = linearly_polarized_light[..., 0, 1] * 2
>>> faraday_rotated_light = de_faraday_data(
data=linearly_polarized_light,
frequency=frequencies,
rotation_measure=5 * u.rad / u.m**2
)
>>> faraday_stokes_u = faraday_rotated_light[..., 0, 1] * 2
>>> fig = plt.figure(figsize=(10, 4))
>>> axes = fig.subplots(nrows=1, ncols=2)
>>> im_0 = axes[0].pcolormesh(times, frequencies.to_value(u.MHz), stokes_u.T)
>>> im_1 = axes[1].pcolormesh(times, frequencies.to_value(u.MHz), faraday_stokes_u.T)
>>> cbar_0 = plt.colorbar(im_0)
>>> cbar_1 = plt.colorbar(im_1)
>>> cbar_1.set_label(r"Stokes U")
>>> axes[0].set_ylabel("Frequency (MHz)")
>>> axes[0].set_xlabel("Time (arbitrary units)")
>>> axes[1].set_xlabel("Time (arbitrary units)")
.. figure:: ../_images/io_images/defarday_example.png
:width: 650
:align: center
See Also
--------
:func:`~nenupy.astro.astro_tools.faraday_angle`, :meth:`~nenupy.io.tf.TFTask.correct_faraday_rotation`
"""
log.info("Correcting for Faraday rotation...")
# Check the dimensions
if (data.ndim != 4) or (data.shape[1:] != (frequency.size, 2, 2)):
raise Exception("Wrong data dimensions!")
# Computing the Faraday angles compared to infinite frequency
log.info(
f"\tComputing {frequency.size} Faraday rotation angles at the RM={rotation_measure}..."
)
rotation_angle = faraday_angle(
frequency=frequency, rotation_measure=rotation_measure, inverse=True
).to_value(u.rad)
log.info("\tApplying Faraday rotation Jones matrices...")
cosa = np.cos(rotation_angle)
sina = np.sin(rotation_angle)
jones = np.transpose(np.array([[cosa, -sina], [sina, cosa]]), (2, 0, 1))
jones_transpose = np.transpose(jones, (0, 2, 1))
return np.matmul(np.matmul(jones, data), jones_transpose)
# ============================================================= #
# ----------------------- get_bandpass ------------------------ #
[docs]
def get_bandpass(n_channels: int) -> np.ndarray:
"""Compute the theoretical bandpass response of a sub-band.
Function and coefficient developped/computed by C. Viou.
Parameters
----------
n_channels : int
Number of channels per sub-band
Returns
-------
:class:`~numpy.ndarray`
The bandpass response of a subband
Raises
------
ValueError
Raised if ``n_channels`` is lower than 16 or is odd.
Example
-------
.. code-block:: python
>>> from nenupy.io.tf_utils import get_bandpass
>>> bp = get_bandpass(n_channels=32)
"""
kaiser_file = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "bandpass_coeffs.dat"
)
kaiser = np.loadtxt(kaiser_file)
if (n_channels < 16) or (n_channels%2 != 0):
raise ValueError("n_channels cannot be lower than 16 and must be even.")
n_tap = 16
over_sampling = n_channels // n_tap
n_fft = over_sampling * kaiser.size
g_high_res = np.fft.fft(kaiser, n_fft)
mid = n_channels // 2
middle = np.r_[g_high_res[-mid:], g_high_res[:mid]]
right = g_high_res[mid : mid + n_channels]
left = g_high_res[-mid - n_channels : -mid]
midsq = np.abs(middle) ** 2
leftsq = np.abs(left) ** 2
rightsq = np.abs(right) ** 2
g = 2**25 / np.sqrt(midsq + leftsq + rightsq)
return g**2.0
# def smooth_subbands(data: np.ndarray, channels: int) -> np.ndarray:
# from scipy.signal import savgol_filter
# n_subbands = int(data.shape[1] / channels)
# subband_shape = (n_subbands, channels) + data.shape[2:]
# median_frequency_profile = np.nanmedian(data, axis=0)
# # Compute the Savgol filter of the frequency profile on a low order polynomial
# # The window is also of the order of the subband as we don't expect much change from one to another
# smoothed_fprofile = savgol_filter(
# median_frequency_profile,
# window_length=2 * channels + 1,
# polyorder=1
# )
# # smoothed_fprofile_sb = smoothed_fprofile.reshape(subband_shape)
# # Compute the shift between the median profile and the smoothed one
# relative_profile_difference = smoothed_fprofile / median_frequency_profile
# # Reshape per subband
# nan_values = np.isnan(median_frequency_profile)
# nan_values_sb = nan_values.reshape(subband_shape)
# difference_sb = relative_profile_difference.reshape(subband_shape)
# # Linear fit of the relative difference per subband
# non_nan_channels = np.sum(nan_values_sb, axis=1)
# linear_approx = np.arange(n_channels)[:, None] * (diff_sb[:, -1] - diff_sb[:, 0]) / n_channels + diff_sb[:, 0]
# ============================================================= #
# ---------------------- flatten_subband ---------------------- #
[docs]
def flatten_subband(data: np.ndarray, channels: int, smooth_frequency_profile: bool = False) -> np.ndarray:
"""Correct each subband if their levels is distributed like a sawtooth.
Sometimes, in particular for strong sources, the subbands adopt a strange behavior (an example is shown in :meth:`~nenupy.io.tf.TFTask.flatten_subband`).
This function aims at correcting this effect.
For a given subband:
* The median frequency profile is computed over the whole data time selection
* Affine function is computed based on two points taken as the medians of both half profiles of the subband
* This function is normalized by the median of the subband total profile
* The corrected subband is the original subband divided by this normalized linear profile
Parameters
----------
data : :class:`~numpy.ndarray`
The data to correct, shaped like (time, frequency, (polarizations))
channels : `int`
Number of channels per subband.
smooth_frequency_profile : `bool`, optional
Not yet finalized..., by default `False`
Returns
-------
:class:`~numpy.ndarray`
The corrected data, where the subbands are flattened.
Raises
------
ValueError
Raised if the ``data`` dimension 1, assumed to be the frequency, does not match ``channels``.
Examples
--------
.. code-block:: python
:emphasize-lines: 17,18,19,20
>>> from nenupy.io.tf_utils import flatten_subband
>>> import numpy as np
>>> import matplotlib.pyplot as plt
>>> n_channels = 32
>>> n_subbands = 10
>>> rng = np.random.default_rng(12345)
>>> coeffs = rng.uniform(-0.1, 0.1, size=n_subbands)
>>> x_data = np.linspace(0, 10, n_channels * n_subbands)
>>> y_data = np.ones(x_data.size) # Idel data are flat
>>> saw_pattern = (coeffs[:, None]*np.linspace(-3, 3, n_channels) )[None, :]
>>> noise = rng.uniform(-0.05, 0.05, x_data.size)
>>> y_data_altered = (y_data.reshape((n_subbands, n_channels)) + saw_pattern).ravel() + noise
>>> y_data_corrected = flatten_subband(
data=y_data_altered.reshape(1, y_data_altered.size, 1, 1),
channels=n_channels
)
>>> fig = plt.figure(figsize=(10, 5))
>>> for sb in range(n_subbands):
>>> plt.axvline(x_data[sb*n_channels], color="black", linestyle=":")
>>> plt.plot(x_data, y_data, label="Original", linewidth=3, linestyle="--")
>>> plt.plot(x_data, y_data_altered, label="Altered")
>>> plt.plot(x_data, y_data_corrected[0, :, 0, 0], label="Corrected")
>>> plt.legend()
.. figure:: ../_images/io_images/flatten.png
:width: 650
:align: center
"""
# Check that data has not been altered, i.e. dimension 1 should be a multiple of channels
if data.shape[1] % channels != 0:
raise ValueError(
f"data's frequency dimension (of size {data.shape[1]}) is "
f"not a multiple of {channels=}. data's second "
"dimension should be of size number_of_subbands*number_of_channels."
)
n_subbands = int(data.shape[1] / channels)
pol_dims = data.ndim - 2 # the first two dimensions should be time and frequency
# Compute the median spectral profile (along the time axis)
median_frequency_profile = np.nanmedian(data, axis=0)
subband_shape = (n_subbands, channels) + data.shape[2:]
# Reshape to have the spectral profile as (subbands, channels, (polarizations...))
median_subband_profile = median_frequency_profile.reshape(subband_shape)
# # Select two data points (away from subband edges) that will be
# # used to compute the affine function that approximates each subband.
# ind1, ind2 = int(np.round(channels*1/3)), int(np.round(channels*2/3))
# # Get the y-values corresponding to these two indices, each y is of shape (subbands, (polarizations...))
# y1, y2 = median_subband_profile[:, ind1, ...], median_subband_profile[:, ind2, ...]
# Split the subband in two and compute the 2 medians that will be
# used to compute the affine function that approximates each subband.
ind1, ind2 = (
int(np.floor(channels / 2)) / 2,
channels - int(np.floor(channels / 2)) / 2,
)
y1 = np.nanmedian(
median_subband_profile[:, : int(np.floor(channels / 2)), ...], axis=1
)
y2 = np.nanmedian(
median_subband_profile[:, int(np.ceil(channels / 2)) :, ...], axis=1
)
# Smooth the future slopes by shifting upward or downards y1 and y2 with respect to the difference
# between them and the next point of the next subband
# This step may result in non desirable results if subbands are not contiguous or not belonging to the same beam
if smooth_frequency_profile:
log.warning("smooth_frequency_profile not fully tested yet, ignoring...")
# smooth_sb_diff = y1[1:, ...] - y2[:-1, ...]
# y1[1:, ...] += smooth_sb_diff / 2
# y2[:-1, ...] -= smooth_sb_diff / 2
# Compute the linear approximations of each subbands, linear_subbands's shape is (channels, subbands, (polarizations...))
x_values = np.arange(channels)[
(...,) + (np.newaxis,) * (pol_dims + 1)
] # +1 --> subbands
slope = (y2 - y1) / (ind2 - ind1)
linear_subbands = (x_values - ind1) * slope + y1 # linear equation
# Compute the subband mean value and the normalised linear subbands
subband_mean_values = np.nanmedian(
linear_subbands, axis=0
) # shape (subbands, (polarizations))
normalised_linear_subbands = np.swapaxes(
linear_subbands / subband_mean_values[None, ...], 0, 1
).reshape(data.shape[1:])
# Correct the data by the normalised linear subbands to flatten them
return data / normalised_linear_subbands[None, ...]
# ============================================================= #
# ------------------ mitigate_rfi_along_axis ------------------ #
[docs]
def mitigate_rfi_along_axis(data: da.Array, axis: int = 0, sigma_clip: float = 2., polynomial_degree: int = 4) -> da.Array:
"""Perform sigma clipping along a given axis.
Compute the median over one axis of a time-frequency dataset to obtain a background profile.
Fit a polynomial function (of degree ``polynomial_degree``) to this profile, which will serve as a smooth background profile (the decimal logarithm of the data is used for fitting).
Set every data points whose value is above ``sigma_clip`` times the standard dedviation of the background profile to NaN values.
Parameters
----------
data : :class:`~dask.Array`
The data to correct for RFI-like features, shaped like (time, frequency, (polarizations))
axis : `int`, optional
Axis over which the median is computed, if ``axis=0`` the median will be done along the time dimension to obtain a spectral profile along which the RFI would be determined (and similarly for ``axis=1``), by default 0
sigma_clip : `float`, optional
The factor above the background at which the data would be clipped, by default 2.
polynomial_degree : `int`, optional
Degree of the polynomial function fitted to the time or spectral profile in order to smooth them, by default 4
Returns
-------
:class:`~dask.Array`
The data whose outliers are set to NaN values.
Raises
------
Exception
If the input data is less than 2D.
IndexError
If the selected ``axis`` is neither 0 or 1.
Examples
--------
.. code-block:: python
>>> from nenupy.io.tf_utils import mitigate_rfi_along_axis
>>> import numpy as np
>>> import matplotlib.pyplot as plt
>>> import matplotlib as mpl
>>> import dask.array as da
>>> polluted_data = 10**(4 + np.sin( np.pi * np.radians(np.arange(50))) + 0.5 * np.random.random((200, 50)))
>>> polluted_data[110:150, 20:22] += 10**9
>>> polluted_data[:, 10::20] += 10**7.5
>>> polluted_data[::20, :] += 10**10
>>> polluted_data[np.diag_indices(50)] += 10**9
>>> data_clean_axis_0 = mitigate_rfi_along_axis(
data=da.from_array(polluted_data),
axis=0,
sigma_clip=3,
polynomial_degree=5
)
>>> data_clean_axis_1 = mitigate_rfi_along_axis(
data=da.from_array(polluted_data),
axis=1,
sigma_clip=3,
polynomial_degree=5
)
>>> colors = mpl.colormaps.get_cmap("YlGnBu_r")
>>> colors.set_bad(color="red")
>>> vmin, vmax = 0, 8
>>> fig = plt.figure(figsize=(10, 4))
>>> axes = fig.subplots(nrows=1, ncols=3)
>>> axes[0].imshow(polluted_data.T, origin="lower", aspect="auto", cmap=colors, vmin=vmin, vmax=vmax)
>>> axes[0].set_title("Polluted data")
>>> axes[1].imshow(data_clean_axis_0.T, origin="lower", aspect="auto", cmap=colors, vmin=vmin, vmax=vmax)
>>> axes[1].set_title("Spectral mitigation")
>>> axes[2].imshow(data_clean_axis_1.T, origin="lower", aspect="auto", cmap=colors, vmin=vmin, vmax=vmax)
>>> axes[2].set_title("Temporal mitigation")
.. figure:: ../_images/io_images/rfi_mitigation.png
:width: 650
:align: center
"""
data = data.copy()
# Compute a profile of the data set along one axis (either time or frequency)
if data.ndim < 2:
raise Exception("The data should at least be 2D.")
if axis == 0:
log.info("RFI mitigation over the spectral profile.")
elif axis ==1:
log.info("RFI mitigation over the time profile.")
else:
raise IndexError("axis should either be 0 (median along time axis) or 1 (median along frequency axis).")
log.info("\tComputing the profile...")
with ProgressBar():
# Compute the profile of the opposite axis in order to flatten the data in the other dimension
other_profile = np.nanmedian(
np.abs(data), axis=0 if axis==1 else 1, keepdims=True
).compute()
other_profile /= np.nanmax(other_profile, keepdims=True) # normalize before dividing
flattened_data = np.abs(data / other_profile)
# Convert to log to ease the fitting process
# Compute the median profile
profile = np.log10(
np.nanmedian(flattened_data, axis=axis)
).compute()
# Make sure to not include NaN values for the fitting
not_nan = np.isfinite(profile) # may have serval other dimensions
not_nan = np.all(not_nan, axis=tuple(range(1, not_nan.ndim)))
# Fit every polarization-equivalent higher dimension by a polynomial function
# Flatten the dimensions to ease the process and reshape everything afterward
log.info("\tPolynomial fitting...")
original_shape = profile.shape
profile = profile.reshape((original_shape[0], int(np.prod(original_shape[1:]))))
xaxis = np.arange(original_shape[0])
fits_coeffs = np.array([np.polyfit(xaxis[not_nan], y[not_nan, ...], polynomial_degree) for y in profile.T])
smooth_profile = 10**np.array([np.poly1d(f)(xaxis) for f in fits_coeffs]).T.reshape(original_shape)
# Set data higher than background * sigma_clip to NaN
log.info(f"\tClipping out data above {sigma_clip} x background...")
if axis == 1:
smooth_profile = smooth_profile[:, None]
# profile_std = np.nanstd(smooth_profile, axis=0, keepdims=True)
# data[flattened_data > sigma_clip * smooth_profile] = np.nan
data[flattened_data > smooth_profile + sigma_clip * np.nanstd(smooth_profile)] = np.nan
return data
# ============================================================= #
# ------------------- plot_dynamic_spectrum ------------------- #
import matplotlib as mpl
import matplotlib.pyplot as plt
from matplotlib.dates import AutoDateLocator, ConciseDateFormatter
[docs]
def plot_dynamic_spectrum(
data: np.ndarray,
time: Time,
frequency: u.Quantity,
fig: mpl.figure.Figure = None,
ax: mpl.axes.Axes = None,
figsize: Tuple[int, int] = (10, 5),
dpi: int = 200,
xlabel: str = None,
ylabel: str = None,
clabel: str = None,
title: str = None,
cmap: str = "YlGnBu_r",
norm: str = "linear",
vmin: float = None,
vmax: float = None
) -> Tuple[mpl.figure.Figure, mpl.axes.Axes]:
"""Plot a dynamic spectrum.
This function uses :func:`~matplotlib.pyplot.pcolormesh` in the background.
Parameters
----------
data : :class:`~numpy.ndarray`
Two-dimensional array, shaped like (time, frequency).
time : :class:`~astropy.time.Time`
One-dimensional time array.
frequency : :class:`~astropy.units.Quantity`
One-dimensional frequency array, if ``ylabel`` is `None`, the :attr:`~astropy.units.Quantity.unit` is automatically used to describe the axis.
fig : :class:`~matplotlib.figure.Figure`, optional
Matplotlib figure if already existing, by default `None`
ax : :class:`~matplotlib.axes.Axes`, optional
Matplotlib ax if already existing, by default `None`
figsize : Tuple[`int`, `int`], optional
Size of the figure in inches, by default (10, 5)
dpi : `int`, optional
Dots per inch (best quality is around 300), by default 200
xlabel : `str`, optional
Label of the x-axis (time), by default `None` (i.e., generic label)
ylabel : `str`, optional
Label of the y-axis (time), by default `None` (i.e., generic label)
clabel : `str`, optional
Label of the colorbar, by default `None` (i.e., empty)
title : `str`, optional
Title of the graph, by default `None` (i.e., empty)
cmap : `str`, optional
Colormap (see `matplotlib colormaps <link https://matplotlib.org/stable/users/explain/colors/colormaps.html>`_), by default "YlGnBu_r"
norm : `str`, optional
Normalization of the colorbar ('linear' or 'log'), by default "linear"
vmin : `float`, optional
Minimal data value to plot, by default `None`
vmax : `float`, optional
Maximal data value to plot, by default `None`
Returns
-------
Tuple[:class:`~matplotlib.figure.Figure`, :class:`~matplotlib.axes.Axes`]
The figure and ax objects
Raises
------
ValueError
Raised if ``norm`` does not match supported value.
Examples
--------
.. code-block:: python
>>> from nenupy.io.tf_utils import plot_dynamic_spectrum
>>> from astropy.time import Time, TimeDelta
>>> import astropy.units as u
>>> import matplotlib.pyplot as plt
>>> import numpy as np
>>> n_times = 20
>>> dt = TimeDelta(1800, format="sec")
>>> n_freqs = 10
>>> df = 1 * u.MHz
>>> fig, ax = plot_dynamic_spectrum(
data=np.arange(n_times * n_freqs).reshape((n_times, n_freqs)),
time=Time("2024-01-01 00:00:00") + np.arange(n_times) * dt,
frequency=50 * u.MHz + np.arange(n_freqs) * df,
clabel="my color bar",
norm="linear",
)
.. figure:: ../_images/io_images/plot_dynamic_spectrum.png
:width: 650
:align: center
Warning
-------
Do not forget to release the matplotlib cache (using
:func:`~matplotlib.pyplot.close`), either after calling several times
this method, and/or once you are done with your desired plot.
Otherwise you may suffer from significant performance issues
as the plots stacks in the memory...
>>> plt.close(fig)
or
>>> plt.close("all")
"""
if fig is None:
fig = plt.figure(figsize=figsize, dpi=dpi)
if ax is None:
ax = fig.add_subplot()
# Normalization
if norm == "linear":
if vmin is None:
vmin = data.min()
if vmax is None:
vmax = data.max()
elif norm == "log":
if vmin is None:
vmin = data[data > 0.].min()
if vmax is None:
vmax = data.max()
else:
raise ValueError("Invald norm, the following are supported: 'linear', 'log'.")
im = ax.pcolormesh(
time.datetime,
frequency.value,
data.T,
shading="nearest",
norm=norm,
cmap=cmap,
vmin=data.min() if vmin is None else vmin,
vmax=data.max() if vmax is None else vmax,
)
# Colorbar
cbar = plt.colorbar(im, pad=0.03)
cbar.set_label(clabel)
# Global
ax.minorticks_on()
ax.set_title(title)
# X axis
locator = AutoDateLocator()
ax.xaxis.set_major_locator(locator)
ax.xaxis.set_major_formatter(ConciseDateFormatter(locator))
ax.set_xlabel(f"Time ({time.scale.upper()})" if xlabel is None else xlabel)
# Y axis
ax.set_ylabel(f"Frequency ({frequency.unit})" if ylabel is None else ylabel)
plt.tight_layout()
return fig, ax
def _plot_1d(
x: Any,
y: Any,
fig: mpl.figure.Figure = None,
ax: mpl.axes.Axes = None,
figsize: Tuple[int, int] = (10, 5),
dpi: int = 200,
xlabel: str = None,
ylabel: str = None,
title: str = None,
norm: str = "linear",
vmin: float = None,
vmax: float = None
) -> Tuple[mpl.figure.Figure, mpl.axes.Axes]:
if fig is None:
fig = plt.figure(figsize=figsize, dpi=dpi)
if ax is None:
ax = fig.add_subplot()
# Normalization
if norm == "linear":
if vmin is None:
vmin = y.min()
if vmax is None:
vmax = y.max()
elif norm == "log":
if vmin is None:
vmin = y[y > 0.].min()
if vmax is None:
vmax = y.max()
else:
raise ValueError("Invald norm, the following are supported: 'linear', 'log'.")
im = ax.plot(x, y)
# Global
ax.minorticks_on()
ax.set_title(title)
# X axis
ax.set_xlabel(xlabel)
# Y axis
ax.set_yscale(norm)
ax.set_ylim(vmin, vmax)
ax.set_ylabel(ylabel)
plt.tight_layout()
return fig, ax
[docs]
def plot_lightcurve(
time: Time,
data: np.ndarray,
fig: mpl.figure.Figure = None,
ax: mpl.axes.Axes = None,
figsize: Tuple[int, int] = (10, 5),
dpi: int = 200,
xlabel: str = None,
ylabel: str = None,
title: str = None,
norm: str = "linear",
vmin: float = None,
vmax: float = None
) -> Tuple[mpl.figure.Figure, mpl.axes.Axes]:
"""Plot a light curve.
This function uses :func:`~matplotlib.pyplot.plot` in the background.
Parameters
----------
time : :class:`~astropy.time.Time`
One-dimensional time array.
data : :class:`~numpy.ndarray`
One-dimensional array, shaped like (time,).
fig : :class:`~matplotlib.figure.Figure`, optional
Matplotlib figure if already existing, by default `None`
ax : :class:`~matplotlib.axes.Axes`, optional
Matplotlib ax if already existing, by default `None`
figsize : Tuple[`int`, `int`], optional
Size of the figure in inches, by default (10, 5)
dpi : `int`, optional
Dots per inch (best quality is around 300), by default 200
xlabel : `str`, optional
Label of the x-axis (time), by default `None` (i.e., generic label)
ylabel : `str`, optional
Label of the y-axis (time), by default `None` (i.e., generic label)
title : `str`, optional
Title of the graph, by default `None` (i.e., empty)
norm : `str`, optional
Normalization of the colorbar ('linear' or 'log'), by default "linear"
vmin : `float`, optional
Minimal data value to plot, by default `None`
vmax : `float`, optional
Maximal data value to plot, by default `None`
Returns
-------
Tuple[:class:`~matplotlib.figure.Figure`, :class:`~matplotlib.axes.Axes`]
The figure and ax objects
Raises
------
ValueError
Raised if ``norm`` does not match supported value.
"""
fig, ax = _plot_1d(
x=time.datetime,
y=data,
fig=fig,
ax=ax,
figsize=figsize,
dpi=dpi,
xlabel=f"Time ({time.scale.upper()})" if xlabel is None else xlabel,
ylabel=f"Amplitude" if ylabel is None else ylabel,
title=title,
norm=norm,
vmin=vmin,
vmax=vmax
)
# Time axis
locator = AutoDateLocator()
ax.xaxis.set_major_locator(locator)
ax.xaxis.set_major_formatter(ConciseDateFormatter(locator))
return fig, ax
[docs]
def plot_spectrum(
frequency: u.Quantity,
data: np.ndarray,
fig: mpl.figure.Figure = None,
ax: mpl.axes.Axes = None,
figsize: Tuple[int, int] = (10, 5),
dpi: int = 200,
xlabel: str = None,
ylabel: str = None,
title: str = None,
norm: str = "linear",
vmin: float = None,
vmax: float = None
) -> Tuple[mpl.figure.Figure, mpl.axes.Axes]:
"""Plot a spectrum.
This function uses :func:`~matplotlib.pyplot.plot` in the background.
Parameters
----------
frequency : :class:`~astropy.units.Quantity`
One-dimensional frequency array, if ``ylabel`` is `None`, the :attr:`~astropy.units.Quantity.unit` is automatically used to describe the axis.
data : :class:`~numpy.ndarray`
One-dimensional array, shaped like (time,).
fig : :class:`~matplotlib.figure.Figure`, optional
Matplotlib figure if already existing, by default `None`
ax : :class:`~matplotlib.axes.Axes`, optional
Matplotlib ax if already existing, by default `None`
figsize : Tuple[`int`, `int`], optional
Size of the figure in inches, by default (10, 5)
dpi : `int`, optional
Dots per inch (best quality is around 300), by default 200
xlabel : `str`, optional
Label of the x-axis (time), by default `None` (i.e., generic label)
ylabel : `str`, optional
Label of the y-axis (time), by default `None` (i.e., generic label)
title : `str`, optional
Title of the graph, by default `None` (i.e., empty)
norm : `str`, optional
Normalization of the colorbar ('linear' or 'log'), by default "linear"
vmin : `float`, optional
Minimal data value to plot, by default `None`
vmax : `float`, optional
Maximal data value to plot, by default `None`
Returns
-------
Tuple[:class:`~matplotlib.figure.Figure`, :class:`~matplotlib.axes.Axes`]
The figure and ax objects
Raises
------
ValueError
Raised if ``norm`` does not match supported value.
"""
fig, ax = _plot_1d(
x=frequency.to_value(frequency.unit),
y=data,
fig=fig,
ax=ax,
figsize=figsize,
dpi=dpi,
xlabel=f"Frequency ({frequency.unit})" if xlabel is None else xlabel,
ylabel=f"Amplitude" if ylabel is None else ylabel,
title=title,
norm=norm,
vmin=vmin,
vmax=vmax
)
return fig, ax
# ============================================================= #
# -------------------- polarization_angle --------------------- #
[docs]
def polarization_angle(stokes_u: np.ndarray, stokes_q: np.ndarray) -> np.ndarray:
r"""Compute the linear polarization angle :math:`\psi`, given U and Q Stokes parameters.
.. math::
\psi = \frac{1}{2} \tan^{-1} \left( \frac{U}{Q} \right)
Parameters
----------
stokes_u : :class:`~numpy.ndarray`
Stokes U.
stokes_q : :class:`~numpy.ndarray`
Stokes Q.
Returns
-------
:class:`~numpy.ndarray`
Polarization angle.
"""
return 0.5 * np.arctan(stokes_u / stokes_q)
# ============================================================= #
# ------------------- rebin_along_dimension ------------------- #
[docs]
def rebin_along_dimension(
data: np.ndarray, axis_array: np.ndarray, axis: int, dx: float, new_dx: float
) -> Tuple[np.ndarray, np.ndarray]:
"""Rebin ``data`` along its ``axis`` dimension.
The corresponding ``axis_array`` is also rebinned.
To compute the rebin factor, this function takes as input the inital
resolution ``dx`` and the final resolution ``new_dx``.
If this process results in a new dimension that is not a multiple of
the rebin factor, the last samples are leftover and not averaged.
Parameters
----------
data : :class:`~numpy.ndarray`
Data array to be rebinned.
axis_array : :class:`~numpy.ndarray`
1D array, corresponding to the axis tp rebin.
axis : `int`
Index of the axis to rebin within ``data``.
dx : `float`
Inital resolution of ``axis_array``.
new_dx : `float`
Target resolution after rebinning. As the rebin factor is
taken as an integer value, the 'effective ``new_dx``' of the ``data``
after rebinning is the floor division btewwen ``dx`` and the target ``new_dx``.
Returns
-------
Tuple[:class:`~numpy.ndarray`, :class:`~numpy.ndarray`]
Rebinned axis and data.
Raises
------
ValueError
Raised if ``axis_array`` is not 1D, or if the ``data[axis]``'s
size does not match ``axis_array``, or if ``dx`` is greater than ``new_dx``.
Examples
--------
.. code-block:: python
>>> from nenupy.io.tf_utils import rebin_along_dimension
>>> new_axis, data_rebinned = rebin_along_dimension(
data=np.arange(11),
axis_array=np.arange(11),
axis=0,
dx=1,
new_dx=2.5 # this would result in a rebin-factor of 2.5//1.=2
)
>>> print(data_rebinned)
[0.5 2.5 4.5 6.5 8.5] # the last sample of data (i.e. 10) has not been considered
See Also
--------
:meth:`~nenupy.io.tf.TFTask.time_rebin`, :meth:`~nenupy.io.tf.TFTask.frequency_rebin`
"""
# Basic checks to make sure that dimensions are OK
if axis_array.ndim != 1:
raise ValueError("axis_array should be 1D.")
elif data.shape[axis] != axis_array.size:
raise ValueError(
f"Axis selected ({axis}) dimension {data.shape[axis]} does not match axis_array's shape {axis_array.shape}."
)
elif dx > new_dx:
raise ValueError("Rebin expect a `new_dx` value larger than original `dx`.")
initial_size = axis_array.size
bin_size = int(np.floor(new_dx / dx))
final_size = int(np.floor(initial_size / bin_size))
# If new_dx is bin_size than initial_size this would lead to a final size of 0
# Instead, we want to simply average the whole dimension at once
if final_size == 0:
final_size = 1
bin_size = initial_size
log.info(f"\tdx: {dx} | new_dx: {new_dx} -> rebin factor: {bin_size} (data size reached!).")
else:
log.info(f"\tdx: {dx} | new_dx: {new_dx} -> rebin factor: {bin_size}.")
leftovers = initial_size % (final_size * bin_size)
d_shape = data.shape
# Reshape the data and the axis to ease the averaging
data = data[
tuple(
[
slice(None) if i != axis else slice(None, initial_size - leftovers)
for i in range(len(d_shape))
]
)
].reshape(
d_shape[:axis]
+ (final_size, int((initial_size - leftovers) / final_size))
+ d_shape[axis + 1 :]
)
axis_array = axis_array[: initial_size - leftovers].reshape(
(final_size, int((initial_size - leftovers) / final_size))
)
# Average the data and the axis along the right dimension
data = np.nanmean(data, axis=axis + 1)
axis_array = np.nanmean(axis_array, axis=1)
log.info(f"\tData rebinned, last {leftovers} samples were not considered.")
return axis_array, data
# ============================================================= #
# ---------------- remove_channels_per_subband ---------------- #
[docs]
def remove_channels_per_subband(
data: np.ndarray, n_channels: int, channels_to_remove: Union[list, np.ndarray]
) -> np.ndarray:
"""Set channel indices of a time-frequency dataset to `NaN` values.
Each subband of ``data`` is determined thanks to ``n_channels``
(and the function :func:`~nenupy.io.tf_utils.reshape_to_subbands`).
Parameters
----------
data : :class:`~numpy.ndarray`
Data to be corrected, must be at least two-dimensional, the first two dimensions being respectively the time and the frequency
n_channels : `int`
Number of channels per subband
channels_to_remove : Union[`list`, :class:`~numpy.ndarray`]
Array of channel indices to set at `NaN` values, if `None` nothing is done and ``data`` is returned
Returns
-------
:class:`~numpy.ndarray`
Time-frequency correlations array, shaped as the original input, except that some channels are set to `NaN`.
Raises
------
TypeError
Raised if ``channels_to_remove`` is not of the correct type or cannot be converted to a :class:`~numpy.ndarray`.
IndexError
Raised if any of the indices listed in ``channels_to_remove`` does not correspond to the ``n_channels`` argument.
Examples
--------
.. code-block:: python
>>> from nenupy.io.tf_utils import remove_channels_per_subband
>>> import numpy as np
>>> result = remove_channels_per_subband(
data=np.ones((2, 10)),
n_channels=5,
channels_to_remove=[1, 3]
)
>>> print(result)
[[ 1. nan 1. nan 1. 1. nan 1. nan 1.]
[ 1. nan 1. nan 1. 1. nan 1. nan 1.]]
See Also
--------
:func:`~nenupy.io.tf_utils.crop_subband_edges`
"""
if channels_to_remove is None:
# Don't do anything
return data
elif not isinstance(channels_to_remove, np.ndarray):
try:
channels_to_remove = np.array(channels_to_remove)
except:
raise TypeError("channels_to_remove must be a numpy array.")
if len(channels_to_remove) == 0:
# Empty list, no channels to remove
return data
elif not np.all(np.isin(channels_to_remove, np.arange(-n_channels, n_channels))):
raise IndexError(
f"Some channel indices are outside the available range (n_channels={n_channels})."
)
log.info("Removing channels...")
original_shape = data.shape
data = reshape_to_subbands(data=data, n_channels=n_channels)
data[:, :, channels_to_remove, ...] = np.nan
data = data.reshape(original_shape)
log.info(f"\tChannels {channels_to_remove} set to NaN.")
return data
# ============================================================= #
# -------------------- reshape_to_subbands -------------------- #
[docs]
def reshape_to_subbands(data: np.ndarray, n_channels: int) -> np.ndarray:
"""Reshape a time-frequency data array by the sub-band dimension.
Given a ``data`` array with one frequency axis of size `n_frequencies`, this functions split this axis in two axes of size `n_subbands` and ``n_channels``.
Parameters
----------
data : :class:`~numpy.ndarray`
Time-frequency correlations array, its second dimension must be the frequencies.
n_channels : `int`
Number of channels per subband.
Returns
-------
:class:`~numpy.ndarray`
Data array, reshaped so that its frequency axis is split in subbands.
Raises
------
ValueError
Raised if ``n_channels`` does not notch the frequency dimension of ``data``.
Examples
--------
.. code-block:: python
>>> from nenupy.io.tf_utils import reshape_to_subbands
>>> import numpy as np
>>> data = np.arange(3*10).reshape((3, 10))
>>> result = reshape_to_subbands(data, 5)
>>> result.shape
(3, 2, 5)
"""
# Reshape the data array to isolate individual subbands
shape = data.shape
n_freqs = shape[1]
if n_freqs % n_channels != 0:
raise ValueError(
"The frequency dimension of `data` doesn't match the argument `n_channels`."
)
data = data.reshape((shape[0], int(n_freqs / n_channels), n_channels) + shape[2:])
return data
# ============================================================= #
# ---------------------- sort_beam_edges ---------------------- #
[docs]
def sort_beam_edges(beam_array: np.ndarray, n_channels: int) -> dict:
r"""Find out where in the frequency axis a beam starts and end.
Parameters
----------
beam_array : :class:`~numpy.ndarray`
Array of beams.
n_channels : `int`
Number of channels per subband.
Returns
-------
`dict`
Dictionnary of keys/values ``beam_index`` :math:`\leftrightarrow` ``(freq_axis_start_idx, freq_axis_end_idx)``
"""
indices = {}
unique_beams = np.unique(beam_array)
for b in unique_beams:
b_idx = np.where(beam_array == b)[0]
indices[str(b)] = (
b_idx[0] * n_channels, # freq start of beam
(b_idx[-1] + 1) * n_channels - 1, # freq stop of beam
)
return indices
# ============================================================= #
# ------------------ spectra_data_to_matrix ------------------- #
[docs]
def spectra_data_to_matrix(fft0: da.Array, fft1: da.Array) -> da.Array:
r"""Reshape the data stored in the .spectra file into Jones formalism:
.. math::
\mathbf{d}_{\rm J}(t, \nu) = \begin{pmatrix}
X\overline{X} & \Re(X\overline{Y}) - i (-\Im(X\overline{Y}))\\
\Re(X\overline{Y}) + i (-\Im(X\overline{Y})) & Y\overline{Y}
\end{pmatrix}
= \begin{pmatrix}
X\overline{X} & X\overline{Y}\\
Y\overline{X} & Y\overline{Y}
\end{pmatrix}
Parameters
----------
fft0 : :class:`~dask.array.Array`
The first data block in the .spectra files. Its last dimension corresponds to :math:`(X_rX_r+X_iX_i, Y_rY_r + Y_iY_i) \rightarrow (X\overline{X}, Y\overline{Y})`
fft1 : :class:`~dask.array.Array`
The second data block in the .spectra files. Its last dimension corresponds to :math:`(X_rY_r + X_iY_i, X_rY_i - X_iY_r) \rightarrow (\Re(X\overline{Y}), -\Im(X\overline{Y}))`
Returns
-------
:class:`~dask.array.Array`
Reshaped data :math:`\mathbf{d}_{\rm J}`.
"""
# fft0 = [XrXr+XiXi : YrYr+YiYi]
# fft1 = [XrYr+XiYi : XrYi-XiYr]
xx = fft0[..., 0] # XrXr + XiXi = XX*
yy = fft0[..., 1] # YrYr + YiYi = YY*
xy = fft1[..., 0] - 1j * fft1[..., 1] # XrYr + XiYi - i(XrYi - XiYr) = XY*
yx = fft1[..., 0] + 1j * fft1[..., 1] # XrYr + XiYi + i(XrYi - XiYr) = YX*
# row1 = da.stack(
# [fft0[..., 0], fft1[..., 0] - 1j * fft1[..., 1]], axis=-1 # XX # XY*
# )
# row2 = da.stack(
# [fft1[..., 0] + 1j * fft1[..., 1], fft0[..., 1]], axis=-1 # YX* # YY
# )
# Check using:
# xx = np.ones(30).reshape((3, 10))
# xy = 2 * np.ones(30).reshape((3, 10))
# yx = 3 * np.ones(30).reshape((3, 10))
# yy = 4 * np.ones(30).reshape((3, 10))
row1 = da.stack([xx, yx], axis=-1)
row2 = da.stack([xy, yy], axis=-1)
return da.stack([row1, row2], axis=-1)
# ============================================================= #
# -------------------- store_dask_tf_data --------------------- #
def _time_to_keywords(prefix: str, time: Time) -> dict:
"""Returns a dictionnary of keywords in the HDF5 format."""
return {
f"{prefix.upper()}_MJD": time.mjd,
f"{prefix.upper()}_TAI": time.tai.isot,
f"{prefix.upper()}_UTC": time.isot + "Z",
}
[docs]
def store_dask_tf_data(
file_name: str,
data: da.Array,
time: Time,
frequency: u.Quantity,
polarization: np.ndarray,
beam: int = 0,
stored_frequency_unit: str = "MHz",
mode="auto",
**metadata,
) -> None:
"""Store a `Dask <https://docs.dask.org/en/stable/>`_ array in a HDF5 file.
The array may be larger than memory, and may be the result of a pipeline.
The main metadata are present.
The file format is constructed similarly to what LOFAR is using.
Parameters
----------
file_name : `str`
Name of the HDF5 file to be stored, if the file already exists and ``mode='a'``, a new dataset with the name of ``beam`` is added.
data : :class:`~dask.array.Array`
The data to store shaped like (time, frequency, (polarizations...)).
time : :class:`~astropy.time.Time`
The time axis describing dimension 0 of ``data``.
frequency : :class:`~astropy.units.Quantity`
The frequency axis describing dimension 1 of ``data``.
polarization : :class:`~numpy.ndarray`
The polarization axis, time-frequency dynamic spectra from ``data`` will be stored, split base on polarization.
beam : `int`, optional
Index of the digital beam (on which the creation of a new dataset within the file is based), by default 0
stored_frequency_unit : `str`, optional
The unit in which the frequency is stored, by default "MHz"
mode : `str`, optional
The writing mode, available modes are 'a' (append), 'w' (write/overwrite), 'auto' (try to append when the file exists), by default "auto"
**metadata : `dict`, optional
Any key-value metadata the user wants to add, they will be stored in the main header.
Raises
------
ValueError
Raised if ``filename`` does not end with '.hdf5'.
Exception
Raised if trying to append data with a digital beam index already present in the file.
KeyError
Raised if the selected mode does not correspond to what is available.
Notes
-----
This method is called when the argument ``file_name`` of :meth:`~nenupy.io.tf.Spectra.get` is filled.
Examples
--------
.. code-block:: python
:emphasize-lines: 13,14,15,16,17,18,19
>>> import numpy as np
>>> import astropy.units as u
>>> from astropy.time import Time, TimeDelta
>>> import dask.array as da
>>> from nenupy.io.tf_utils import store_dask_tf_data
>>> import h5py
>>> n_times = 10
>>> n_freqs = 20
>>> polarizations = np.array(["I", "Q", "U"])
>>> data_shape = (n_times, n_freqs, polarizations.size)
>>> store_dask_tf_data(
file_name="test.hdf5",
data=da.arange(np.prod(data_shape)).reshape(data_shape),
time=Time("2024-01-01T00:00:00") + np.arange(n_times) * TimeDelta(10, format="sec"),
frequency=np.linspace(20, 80, n_freqs) * u.MHz,
polarization=polarizations,
)
>>> f = h5py.File("test.hdf5", "r")
>>> f.keys()
<KeysViewHDF5 ['SUB_ARRAY_POINTING_000']>
>>> analog_beam_dset = f['SUB_ARRAY_POINTING_000']
>>> analog_beam_dset.keys()
<KeysViewHDF5 ['BEAM_000']>
>>> digital_beam_dset = analog_beam_dset["BEAM_000"]
>>> digital_beam_dset.keys()
<KeysViewHDF5 ['COORDINATES', 'I', 'Q', 'U']>
>>> digital_beam_dset["I"].shape
(10, 20)
>>> digital_beam_dset["COORDINATES"].keys(), digital_beam_dset["COORDINATES"].attrs["units"]
(<KeysViewHDF5 ['frequency', 'time']>, array(['jd', 'MHz'], dtype=object))
>>> digital_beam_dset["COORDINATES"]["frequency"][:]
array([20. , 23.15789474, 26.31578947, 29.47368421, 32.63157895,
35.78947368, 38.94736842, 42.10526316, 45.26315789, 48.42105263,
51.57894737, 54.73684211, 57.89473684, 61.05263158, 64.21052632,
67.36842105, 70.52631579, 73.68421053, 76.84210526, 80. ])
"""
log.info(f"Storing the data in '{file_name}'")
# Check that the file_name has the correct extension
if not file_name.lower().endswith(".hdf5"):
raise ValueError(f"HDF5 files must ends with '.hdf5', got {file_name} instead.")
if mode.lower() == "auto":
if os.path.isfile(file_name):
mode = "a"
else:
mode = "w"
stored_freq_quantity = u.Unit(stored_frequency_unit)
frequency_min = frequency.min()
frequency_max = frequency.max()
with h5py.File(file_name, mode) as wf:
beam_group_name = f"BEAM_{beam:03}"
if mode == "w":
log.info("\tCreating a brand new file...")
# Update main attributes
wf.attrs.update(metadata)
wf.attrs["SOFTWARE_NAME"] = "nenupy"
# wf.attrs["SOFTWARE_VERSION"] = nenupy.__version__
wf.attrs["SOFTWARE_MAINTAINER"] = "alan.loh@obspm.fr"
wf.attrs["FILENAME"] = file_name
wf.attrs.update(_time_to_keywords("OBSERVATION_START", time[0]))
wf.attrs.update(_time_to_keywords("OBSERVATION_END", time[-1]))
wf.attrs["TOTAL_INTEGRATION_TIME"] = (time[-1] - time[0]).sec
wf.attrs["OBSERVATION_FREQUENCY_MIN"] = frequency_min.to_value(
stored_freq_quantity
)
wf.attrs["OBSERVATION_FREQUENCY_MAX"] = frequency_max.to_value(
stored_freq_quantity
)
wf.attrs["OBSERVATION_FREQUENCY_CENTER"] = (
(frequency_max + frequency_min) / 2
).to_value(stored_freq_quantity)
wf.attrs["OBSERVATION_FREQUENCY_UNIT"] = stored_frequency_unit
sub_array_group = wf.create_group(
"SUB_ARRAY_POINTING_000"
) # TODO modify if a Spectra file can be generated from more than 1 analog beam
elif mode == "a":
log.info("\tTrying to append data to existing file...")
sub_array_group = wf["SUB_ARRAY_POINTING_000"]
if beam_group_name in sub_array_group.keys():
raise Exception(
f"File '{file_name}' already contains '{beam_group_name}'."
)
else:
raise KeyError(f"Invalid mode '{mode}'. Select 'w' or 'a' or 'auto'.")
beam_group = sub_array_group.create_group(beam_group_name)
beam_group.attrs.update(_time_to_keywords("TIME_START", time[0]))
beam_group.attrs.update(_time_to_keywords("TIME_END", time[-1]))
beam_group.attrs["FREQUENCY_MIN"] = frequency_min.to_value(stored_freq_quantity)
beam_group.attrs["FREQUENCY_MAX"] = frequency_max.to_value(stored_freq_quantity)
beam_group.attrs["FREQUENCY_UNIT"] = stored_frequency_unit
coordinates_group = beam_group.create_group("COORDINATES")
# Set time and frequency axes
coordinates_group["time"] = time.jd
coordinates_group["time"].make_scale("Time (JD)")
coordinates_group["frequency"] = frequency.to_value(stored_freq_quantity)
coordinates_group["frequency"].make_scale(
f"Frequency ({stored_frequency_unit})"
)
coordinates_group.attrs["units"] = ["jd", stored_frequency_unit]
log.info("\tTime and frequency axes written.")
# Ravel the last polarization dimensions (above dim=2 -> freq)
data = np.reshape(data, data.shape[:2] + (-1,))
for pi in range(data.shape[-1]):
current_polar = polarization[pi].upper()
log.info(f"\tDealing with polarization '{current_polar}'...")
data_i = data[:, :, pi]
# data_group = beam_group.create_group(f"{current_polar.upper()}")
dataset = beam_group.create_dataset(
name=f"{current_polar}", shape=data_i.shape, dtype=data_i.dtype
)
dataset.dims[0].label = "time"
dataset.dims[0].attach_scale(coordinates_group["time"])
dataset.dims[1].label = "frequency"
dataset.dims[1].attach_scale(coordinates_group["frequency"])
with ProgressBar():
da.store(data_i, dataset, compute=True, return_stored=False)
log.info(f"\t'{file_name}' written.")
# ============================================================= #
# ------------------------ _Parameter ------------------------- #
class _TFParameter(ABC):
def __init__(
self,
name: str,
param_type: Any = None,
partial_type_kw: dict = None,
help_msg: str = "",
):
self.name = name
self.param_type = param_type
self.partial_type_kw = partial_type_kw
self.help_msg = help_msg
self._value = None
def __str__(self) -> str:
return f"{self.name}={self.value}"
@property
def value(self) -> Any:
return self._value
@value.setter
def value(self, v: Any):
if v is None:
# Fill value, don't check anything.
self._value = None
return
# Check that the value is of correct type, if not try to convert
v = self._type_check(v)
# Check that the value is within the expected boundaries
if not self.is_expected_value(v):
raise ValueError(f"Unexpected value of {self.name}. {self.help_msg}")
log.debug(f"Parameter '{self.name}' set to {v}.")
self._value = v
def info(self):
print(self.help_msg)
def _type_check(self, value: Any) -> Any:
if self.param_type is None:
# No specific type has been defined
return value
elif isinstance(value, self.param_type):
# Best scenario, the value is of expected type
return value
log.debug(
f"{self.name} is of type {type(value)}. "
f"Trying to convert it to {self.param_type}..."
)
try:
if not (self.partial_type_kw is None):
converted_value = partial(self.param_type, **self.partial_type_kw)(
value
)
log.debug(f"{self.name} set to {converted_value}.")
return converted_value
return self.param_type(value)
except:
raise TypeError(
f"Type of '{self.name}' should be {self.param_type}, got {type(value)} instead! {self.help_msg}"
)
@abstractmethod
def is_expected_value(self, value: Any) -> bool:
raise NotImplementedError(
f"Need to implement 'is_expected_value' in child class {self.__class__.__name__}."
)
class _FixedParameter(_TFParameter):
def __init__(self, name: str, value: Any = None):
super().__init__(name=name)
self._value = value
@property
def value(self) -> Any:
return self._value
@value.setter
def value(self, v: Any):
raise Exception(f"_FixedParameter {self.name}'s value attribute cannot be set.")
def is_expected_value(self, value: Any) -> bool:
raise Exception("This should not have been called!")
class _ValueParameter(_TFParameter):
def __init__(
self,
name: str,
default: Any = None,
param_type: Any = None,
min_val: Any = None,
max_val: Any = None,
resolution: Any = 0,
partial_type_kw: dict = None,
help_msg: str = "",
):
super().__init__(
name=name,
param_type=param_type,
partial_type_kw=partial_type_kw,
help_msg=help_msg,
)
self.min_val = min_val
self.max_val = max_val
self.resolution = resolution
self.value = default
def is_expected_value(self, value: Any) -> bool:
if not (self.min_val is None):
if value < self.min_val - self.resolution / 2:
log.error(
f"{self.name}'s value ({value}) is lower than the min_val {self.min_val}!"
)
return False
if not (self.max_val is None):
if value > self.max_val + self.resolution / 2:
log.error(
f"{self.name}'s value ({value}) is greater than the max_val {self.max_val}!"
)
return False
return True
class _RestrictedParameter(_TFParameter):
def __init__(
self,
name: str,
default: Any = None,
param_type: Any = None,
available_values: list = [],
help_msg: str = "",
):
super().__init__(name=name, param_type=param_type, help_msg=help_msg)
self.available_values = available_values
self.value = default
def is_expected_value(self, value: Any) -> bool:
if not hasattr(value, "__iter__"):
value = [value]
for val in value:
if val not in self.available_values:
log.error(f"{self.name} value must be one of {self.available_values}.")
return False
return True
class _BooleanParameter(_TFParameter):
def __init__(self, name: str, default: Any = None, help_msg: str = ""):
super().__init__(name=name, param_type=None, help_msg=help_msg)
self.value = default
def is_expected_value(self, value: bool) -> bool:
if not isinstance(value, bool):
log.error(f"{self.name}'s value should be a Boolean, got {value} instead.")
return False
return True
# ============================================================= #
# ------------------- TFPipelineParameters -------------------- #
[docs]
class TFPipelineParameters:
"""Class to handle the parameters used for the pipeline processing of the NenuFAR time-frequency data.
Examples
--------
.. code-block:: python
>>> from nenupy.io.tf_utils import TFPipelineParameters, _ValueParameter, _BooleanParameter
>>> parameters = TFPipelineParameters(
_ValueParameter(
name="param_1",
default=1.,
param_type=float,
min_val=0.1,
max_val=2.0
),
_BooleanParameter(
name="param_2",
default=True
)
)
Parameters can be set, an error is sent if the value is outside the pre-defined range:
.. code-block:: python
>>> parameters["param_1"] = 1.5
>>> parameters["param_1"]
1.5
>>> parameters["param_1"] = 3
ERROR: param_1's value (3.0) is greater than the max_val 2.0!
See Also
--------
:ref:`custom_pipeline_param_doc`
"""
[docs]
def __init__(self, *parameters):
"""Generate an instance of :class:`~nenupy.io.tf_utils.TFPipelineParameters`.
Parameters
----------
*parameters : :class:`nenupy.io.tf_utils._TFParameter`
Parameter description.
"""
self.parameters = parameters
self._original_parameters = copy.deepcopy(parameters)
def __setitem__(self, name: str, value: Any):
"""Set the value of a parameter.
Parameters
----------
name : `str`
Name of the parameter.
value : `Any`
New value of the paramater.
"""
param = self._get_parameter(name)
param.value = value
def __getitem__(self, name: str) -> _TFParameter:
return self._get_parameter(name).value
def __repr__(self) -> str:
message = "\n".join([str(param) for param in self.parameters])
return message
[docs]
def info(self) -> str:
"""Display information on each parameter and its current value.
Returns
-------
`str`
The info message.
Example
-------
.. code-block:: python
>>> print(parameters.info())
param_1: 1.0
param_2: True
"""
message = ""
for param in self.parameters:
message += f"{param.name}: {param.value}\n"
return message
def _get_parameter(self, name: str):
for param in self.parameters:
if param.name == name.lower():
return param
else:
param_names = [param.name for param in self.parameters]
raise KeyError(
f"Unknown parameter '{name}'! Available parameters are {param_names}."
)
[docs]
def reset(self) -> None:
"""Reset all parameters to their original values.
Example
-------
.. code-block:: python
>>> print(parameters.info())
param_1: 1.0
param_2: True
>>> parameters["param_1"] = 1.5
>>> print(parameters.info())
param_1: 1.5
param_2: True
>>> parameters.reset()
>>> print(parameters.info())
param_1: 1.0
param_2: True
"""
self.parameters = copy.deepcopy(self._original_parameters)
[docs]
def copy(self):
"""Copy the :class:`~nenupy.io.tf_utils.TFPipelineParameters` instance.
Returns
-------
:class:`~nenupy.io.tf_utils.TFPipelineParameters`
The copy.
"""
return copy.deepcopy(self)
[docs]
@classmethod
def set_default(
cls,
time_min: Time,
time_max: Time,
freq_min: u.Quantity,
freq_max: u.Quantity,
beams: list,
channels: int,
dt: u.Quantity,
df: u.Quantity,
):
"""Set the parameters to their default values.
It auto generates every instance of :class:`nenupy.io.tf_utils._TFParameter` for each available default parameter.
Some informations are required in ordedr to put limits to these parameter values.
Parameters
----------
time_min : :class:`~astropy.time.Time`
Minimal time that can be set to either ``'tmin'`` or ``'tmax'``.
time_max : :class:`~astropy.time.Time`
Maximal time that can be set to either ``'tmin'`` or ``'tmax'``.
freq_min : :class:`~astropy.units.Quantity`
Minimal frequency that can be set to either ``'fmin'`` or ``'fmax'``.
freq_max : :class:`~astropy.units.Quantity`
Maximal frequency that can be set to either ``'fmin'`` or ``'fmax'``.
beams : `list`
List of available beam indices.
channels : `int`
Number of channels per subband.
dt : :class:`~astropy.units.Quantity`
Time resolution.
df : :class:`~astropy.units.Quantity`
Frequency resolution.
Returns
-------
:class:`~nenupy.io.tf_utils.TFPipelineParameters`
Default parameters.
Example
-------
.. code-block:: python
>>> from nenupy.io.tf_utils import TFPipelineParameters
>>> import astropy.units as u
>>> from astropy.time import Time
>>> default_params = TFPipelineParameters.set_default(
time_min=Time("2024-01-01 12:00:00"),
time_max=Time("2024-01-01 13:30:00"),
freq_min=20*u.MHz,
freq_max=80*u.MHz,
beams=[0, 1],
channels=32,
dt=84 * u.ms,
df=6 * u.kHz,
)
>>> print(default_params.info())
channels: 32
dt: 84.0 ms
df: 6.0 kHz
tmin: 2024-01-01 12:00:00.000
tmax: 2024-01-01 13:30:00.000
fmin: 20.0 MHz
fmax: 80.0 MHz
beam: 0
dispersion_measure: None
rotation_measure: None
rebin_dt: None
rebin_df: None
remove_channels: None
skycoord: None
calib_dt: None
dreambeam_parallactic: True
stokes: I
ignore_volume_warning: False
overwrite: False
smooth_frequency_profile: False
"""
return cls(
_FixedParameter(name="channels", value=channels),
_FixedParameter(name="dt", value=dt),
_FixedParameter(name="df", value=df),
_ValueParameter(
name="tmin",
default=time_min,
param_type=Time,
min_val=time_min,
max_val=time_max,
resolution=dt,
partial_type_kw={"precision": 7},
help_msg="Lower edge of time selection, can either be given as an astropy.Time object or an ISOT/ISO string.",
),
_ValueParameter(
name="tmax",
default=time_max,
param_type=Time,
min_val=time_min,
max_val=time_max,
resolution=dt,
partial_type_kw={"precision": 7},
help_msg="Upper edge of time selection, can either be given as an astropy.Time object or an ISOT/ISO string.",
),
_ValueParameter(
name="fmin",
default=freq_min,
param_type=u.Quantity,
min_val=freq_min,
max_val=freq_max,
resolution=df,
partial_type_kw={"unit": "MHz"},
help_msg="Lower frequency boundary selection, can either be given as an astropy.Quantity object or float (assumed to be in MHz).",
),
_ValueParameter(
name="fmax",
default=freq_max,
param_type=u.Quantity,
min_val=freq_min,
max_val=freq_max,
resolution=df,
partial_type_kw={"unit": "MHz"},
help_msg="Higher frequency boundary selection, can either be given as an astropy.Quantity object or float (assumed to be in MHz).",
),
_RestrictedParameter(
name="beam",
default=beams[0],
param_type=int,
available_values=beams,
help_msg="Beam selection, a single integer corresponding to the index of a recorded numerical beam is expected.",
),
_ValueParameter(
name="dispersion_measure",
default=None,
param_type=u.Quantity,
partial_type_kw={"unit": "pc cm-3"},
help_msg="Enable the correction by this Dispersion Measure, can either be given as an astropy.Quantity object or float (assumed to be in pc/cm^3).",
),
_ValueParameter(
name="rotation_measure",
default=None,
param_type=u.Quantity,
partial_type_kw={"unit": "rad m-2"},
help_msg="Enable the correction by this Rotation Measure, can either be given as an astropy.Quantity object or float (assumed to be in rad/m^2).",
),
_ValueParameter(
name="rebin_dt",
default=None,
param_type=u.Quantity,
min_val=dt,
partial_type_kw={"unit": "s"},
help_msg="Desired rebinning time resolution, can either be given as an astropy.Quantity object or float (assumed to be in sec).",
),
_ValueParameter(
name="rebin_df",
default=None,
param_type=u.Quantity,
min_val=df,
partial_type_kw={"unit": "kHz"},
help_msg="Desired rebinning frequency resolution, can either be given as an astropy.Quantity object or float (assumed to be in kHz).",
),
# _BooleanParameter(
# name="correct_bandpass", default=True,
# help_msg="Enable/disable the correction of each subband bandpass."
# ),
# _BooleanParameter(
# name="correct_jump", default=False,
# help_msg="Enable/disable the auto-correction of 6-min jumps, the results of this process are highly data dependent."
# ),
_RestrictedParameter(
name="remove_channels",
default=None,
param_type=list,
available_values=np.arange(-channels, channels),
help_msg="List of subband channels to remove, e.g. `remove_channels=[0,1,-1]` would remove the first, second (low-freq) and last channels from each subband.",
),
_ValueParameter(
name="skycoord",
default=None,
param_type=SkyCoord,
help_msg="Tracked celestial coordinates used for beam and polarization corrections, an astropy.SkyCoord object is expected.",
),
_ValueParameter(
name="calib_dt",
default=None,
param_type=u.Quantity,
partial_type_kw={"unit": "s"},
help_msg="Time resolution used for beam and polarization corrections, an astropy.Quantity or a float (assumed in seconds) are expected.",
),
_BooleanParameter(
name="dreambeam_parallactic",
default=True,
help_msg="DreamBeam parallactic angle correction (along with 'dreambeam_skycoord' and 'dreambeam_dt'), a boolean is expected.",
),
_ValueParameter(
name="stokes",
default="I",
param_type=None,
help_msg="Stokes parameter selection, can either be given as a string or a list of strings, e.g. ['I', 'Q', 'V/I'].",
),
_BooleanParameter(
name="ignore_volume_warning",
default=False,
help_msg="Ignore or not (default value) the limit regarding output data volume.",
),
_BooleanParameter(
name="overwrite",
default=False,
help_msg="Overwrite or not (default value) the resulting HDF5 file.",
),
_BooleanParameter(
name="smooth_frequency_profile",
default=False,
help_msg="Smooth the adjacent subbands (option of task `flatten_subband`).",
),
_ValueParameter(
name="lna_filter",
default=None,
param_type=int,
help_msg="NenuFAR LNA filter, this would modify the shape of the bandpass response.",
),
)
# ============================================================= #
# ---------------------- ReducedSpectra ----------------------- #
[docs]
class ReducedSpectraSlice:
[docs]
def __init__(self, time: Time, frequency: u.Quantity, polarization: np.ndarray, data: h5py._hl.dataset.Dataset):
self.time = time.reshape((1,)) if time.isscalar else time
self.frequency = frequency.reshape((1,)) if frequency.isscalar else frequency
self.polarization = np.array([polarization]) if np.isscalar(polarization) else np.array(polarization)
if (self.time.size == 0) or (self.frequency.size == 0) or (self.polarization.size == 0):
raise IndexError("Selection led to 0 a zero element dimension. Keep a minimum of one element.")
self.data = data.reshape((self.time.size, self.frequency.size, self.polarization.size))
def __getitem__(self, data_slice: slice):
if not isinstance(data_slice, tuple):
data_slice = (data_slice,)
elif len(data_slice) > 3:
raise IndexError("Maximal slice dimension is 3.")
time_slice = data_slice[0]
try:
frequency_slice = data_slice[1]
except IndexError:
frequency_slice = slice(None, None, None)
try:
polarization_slice = data_slice[2]
except IndexError:
polarization_slice = slice(None, None, None)
return ReducedSpectraSlice(
time=self.time[time_slice],
frequency=self.frequency[frequency_slice],
polarization=self.polarization[polarization_slice],
data=self.data[time_slice, frequency_slice, polarization_slice]
)
@property
def shape(self) -> Tuple[int]:
"""_summary_
Returns
-------
Tuple[int]
_description_
"""
return self.data.shape
[docs]
def plot(self, polarization: str = None, **kwargs) -> None:
"""_summary_
Parameters
----------
polarization : str, optional
_description_, by default None
Returns
-------
_type_
_description_
See Also
--------
:func:`~nenupy.io.tf_utils.plot_dynamic_spectrum`, :func:`~nenupy.io.tf_utils.plot_lightcurve`, :func:`~nenupy.io.tf_utils.plot_spectrum`
"""
# Select a single polarization
if polarization is None:
polarization_id = 0
else:
try:
polarization_id = np.argwhere(self.polarization==polarization)[0, 0]
except IndexError:
log.error(f"Unable to find polarization '{polarization}'.")
polarization_id = 0
log.info(f"Selected polarization: '{self.polarization[polarization_id]}'.")
# Plot spectrum
if (self.data.shape[0] == 1) & (self.data.shape[1] > 1):
fig, ax = plot_spectrum(
data=self.data[0, :, polarization_id].ravel(),
frequency=self.frequency,
**kwargs
)
# Plot lightcurve
elif (self.data.shape[0] > 1) & (self.data.shape[1] == 1):
fig, ax = plot_lightcurve(
data=self.data[:, 0, polarization_id].ravel(),
time=self.time,
**kwargs
)
# Plot dynamic spectrum
elif (self.data.shape[0] > 1) & (self.data.shape[1] > 1):
fig, ax = plot_dynamic_spectrum(
data=self.data[:, :, polarization_id],
time=self.time,
frequency=self.frequency,
**kwargs
)
return fig, ax
[docs]
def multi_plot(self, nrows: int, ncols: int, fig: mpl.figure.Figure = None, figsize: Tuple[int, int] = None, **kwargs):
"""_summary_
Parameters
----------
nrows : int
_description_
ncols : int
_description_
fig : mpl.figure.Figure, optional
_description_, by default None
figsize : Tuple[int, int], optional
_description_, by default None
Example
-------
.. code-block:: python
>>> import numpy as np
>>> from astropy.time import Time, TimeDelta
>>> import astropy.units as u
>>> from nenupy.io.tf_utils import ReducedSpectraSlice
>>> pols = np.array(["I", "Q", "U", "V/I"])
>>> nt, nf, no = (10, 5, pols.size)
>>> data = (np.arange(1, nt + 1)[:, None] * np.arange(1, nf + 1)[None, :])[:, :, None] * 2**np.arange(no)[None, None, :]
>>> dd = ReducedSpectraSlice(
time=Time.now() + np.arange(nt) * TimeDelta(1800, format="sec"),
frequency=np.arange(nf) * u.MHz,
polarization=pols,
data=data
)
>>> f, axes = dd[2:5, ...].multi_plot(2, 2, figsize=(15, 7), norm="linear", vmin=0, vmax=200)
.. figure:: ../_images/io_images/reducedspectra_multiplot.png
:width: 650
:align: center
"""
if fig is None:
fig = plt.figure(figsize=(10 * ncols, 5 * nrows) if figsize is None else figsize)
# Remove some keywords that could double interact with the main figure
for key in ["ax", "title"]:
try:
del kwargs[key]
log.warning(f"Keyword '{key}' set but ignored in this plot configuration.")
except KeyError:
pass
kwargs["clabel"] = None
axes = fig.subplots(nrows=nrows, ncols=ncols, sharex=True, sharey=True)
axes = axes.ravel()
for i, polarization in enumerate(self.polarization):
_, _ = self.plot(
polarization=polarization,
fig=fig,
ax=axes[i],
title=f"Polarization: '{polarization}'",
**kwargs
)
plt.tight_layout()
return fig, axes
[docs]
class ReducedSpectra:
[docs]
def __init__(self, file_name: str):
self.file_name = file_name
self._rfile = h5py.File(file_name, "r")
log.info(f"'{self.file_name}' opened.")
def __getitem__(self, abeam_dbeam_id: Tuple[int, int]):
if len(abeam_dbeam_id) != 2:
raise IndexError("Two indices are expected (analog_beam, digital_beam).")
time, frequency, polar, data = self.get(
subarray_pointing_id=abeam_dbeam_id[0],
beam_id=abeam_dbeam_id[1],
data_key=None
)
return ReducedSpectraSlice(
time=time,
frequency=frequency,
polarization=polar,
data=data
)
[docs]
def info(self, condense: bool = True) -> str:
"""Print out informations about the file structure.
Parameters
----------
condense : `bool`, optional
If set to `True`, the dataet attributes are not shown, by default `True`
Returns
-------
`str`
File informations.
Example
-------
.. code-block:: python
>>> from nenupy.io.tf_utils import ReducedSpectra
>>> rs = ReducedSpectra("/path/to/my/file.hdf5")
>>> print(rs.info())
Data Structure:
--- 'SUB_ARRAY_POINTING_000' ---
SUBARRAY_POINTING_ID: 0
--- 'BEAM_000' ---
BEAM_ID: 0
DATASETS:
'I': (10, 20)
'Q': (10, 20)
'U': (10, 20)
"""
message = ""
s1 = " " * 2
s2 = " " * 6
if not condense:
for key, val in self._rfile.attrs.items():
message += f"{key}: {val}\n"
message += "\nData Structure:\n"
analog_beams = list(self._rfile.keys())
for analog_beam in analog_beams:
message += f"{s1}--- '{analog_beam}' ---\n"
message += f"{s2}SUBARRAY_POINTING_ID: {int(analog_beam.split('_')[-1])}\n"
if not condense:
for key, val in self._rfile[analog_beam].attrs.items():
message += f"{s2}{key}: {val}\n"
digital_beams = list(self._rfile[analog_beam].keys())
for digital_beam in digital_beams:
message += f"\n{s1}{s2}--- '{digital_beam}' ---\n"
message += f"{s1}{s2}{s2}BEAM_ID: {int(digital_beam.split('_')[-1])}\n"
message += f"{s1}{s2}{s2}DATASETS (time, frequency):\n"
for pol in list(self._rfile[analog_beam][digital_beam].keys())[1:]:
message += f"{s1}{s2}{s2}{s1}'{pol}': {(self._rfile[analog_beam][digital_beam][pol].shape)}\n"
if not condense:
for key, val in self._rfile[analog_beam][digital_beam].attrs.items():
message += f"{s1}{s2}{s2}{key}: {val}\n"
return message
[docs]
def get(
self, data_key: str = None, subarray_pointing_id: int = 0, beam_id: int = 0
) -> Tuple[Time, u.Quantity, str, np.ndarray]:
"""_summary_
Parameters
----------
data_key : str, optional
_description_, by default None
subarray_pointing_id : int, optional
_description_, by default 0
beam_id : int, optional
_description_, by default 0
Returns
-------
Tuple[Time, u.Quantity, np.ndarray np.ndarray]
_description_
Raises
------
KeyError
_description_
"""
analog_beam = f"SUB_ARRAY_POINTING_{subarray_pointing_id:03}"
digital_beam = f"BEAM_{beam_id:03}"
data_ext = self._rfile[f"{analog_beam}/{digital_beam}"]
log.info(f"Data from analog/digital beam '{analog_beam}/{digital_beam}' loaded.")
available_keys = list(data_ext.keys())
available_keys.remove("COORDINATES")
if data_key is None:
# If no key is selected, take the first one by default
# data_key = available_keys[0]
log.info("No data_key selected, all datasets will be returned by default.")
polarization = np.array(available_keys)
elif data_key not in available_keys:
raise KeyError(
f"Invalid data_key '{data_key}', available values: {available_keys}."
)
else:
polarization = np.array([data_key])
log.info(f"Selected data extension '{polarization}'.")
times_axis, frequency_axis = self._build_axes(data_ext["COORDINATES"])
return times_axis, frequency_axis, polarization, np.stack([data_ext[pol] for pol in polarization], axis=-1)
def plot(
self,
data_key: str = None,
subarray_pointing_id: int = 0,
beam_id: int = 0,
**kwargs,
):
time, frequency, polar, data = self.get(
subarray_pointing_id=subarray_pointing_id,
beam_id=beam_id,
data_key=data_key
)
try:
fig, ax = plot_dynamic_spectrum(
data=data[:], time=time, frequency=frequency, clabel=polar, **kwargs
)
return fig, ax
except:
raise
def close(self):
self._rfile.close()
log.info(f"'{self.file_name}' closed.")
@staticmethod
def _build_axes(hdf5_coordinates_ext) -> Tuple[Time, u.Quantity]:
"""_summary_
Returns
-------
Tuple[Time, u.Quantity]
_description_
"""
units = hdf5_coordinates_ext.attrs["units"]
time = Time(hdf5_coordinates_ext["time"][:], format=units[0])
frequency = hdf5_coordinates_ext["frequency"][:] * u.Unit(units[1])
return time, frequency