#! /usr/bin/python3
# -*- coding: utf-8 -*-
"""
********
XST file
********
.. inheritance-diagram:: nenupy.io.xst.XST nenupy.io.xst.NenufarTV
:parts: 3
.. autosummary::
~XST
~NenufarTV
TV_Image
TV_Nearfield
"""
__author__ = "Alan Loh"
__copyright__ = "Copyright 2023, nenupy"
__credits__ = ["Alan Loh"]
__maintainer__ = "Alan"
__email__ = "alan.loh@obspm.fr"
__status__ = "Production"
__all__ = ["XST_Slice", "Crosslet", "XST", "TV_Image", "TV_Nearfield", "NenufarTV"]
from abc import ABC
import os
from itertools import islice
from astropy.time import Time, TimeDelta
from astropy.coordinates import SkyCoord, AltAz, Angle
import astropy.units as u
from astropy.io import fits
from healpy.fitsfunc import write_map, read_map
from healpy.pixelfunc import mask_bad, nside2resol
import numpy as np
import json
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
from matplotlib.colorbar import ColorbarBase
from matplotlib.ticker import LinearLocator
from matplotlib.colors import Normalize
from matplotlib.cm import get_cmap
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
import dask.array as da
from dask.diagnostics import ProgressBar
import nenupy
from os.path import join, dirname
from nenupy.astro.target import FixedTarget, SolarSystemTarget
from nenupy.io.io_tools import StatisticsData, ST_Slice
from nenupy.astro import wavelength, altaz_to_radec, l93_to_etrs, etrs_to_enu
from nenupy.astro.uvw import compute_uvw
from nenupy.astro.sky import HpxSky
from nenupy.astro.pointing import Pointing
from nenupy.instru import (
NenuFAR,
MiniArray,
read_cal_table,
freq2sb,
nenufar_miniarrays,
)
from nenupy import nenufar_position, DummyCtMgr
import logging
log = logging.getLogger(__name__)
# ============================================================= #
# ------------------------- XST_Slice ------------------------- #
[docs]
class XST_Slice:
"""Class to handle the result of selection upon XST-like data.
.. rubric:: Methods Summary
.. autosummary::
~XST_Slice.plot_correlaton_matrix
~XST_Slice.rephase_visibilities
~XST_Slice.make_image
~XST_Slice.make_nearfield
.. rubric:: Attributes and Methods Documentation
"""
def __init__(self, mini_arrays, time, frequency, value):
self.mini_arrays = mini_arrays
self.time = time
self.frequency = frequency
self.value = value
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def plot_correlaton_matrix(self, mask_autocorrelations: bool = False, **kwargs):
"""Plots the cross-correlation matrix.
:param mask_autocorrelations:
If set to ``True``, the auto-correlation diagnoal
is hidden.
Default is ``False``.
:type mask_autocorrelations:
`bool`
Several parameters, listed below, can be tuned to adapt the plot
to the user requirements:
.. rubric:: Data display keywords
:param decibel:
If set to ``True``, the data will be displayed in
a decibel scale (i.e., :math:`{\rm dB} = 10 \log_{10}({\rm data})`).
Default is ``True``.
:type decibel:
`bool`
:param vmin:
*Dynamic spectrum plot only*.
Minimal data value to display.
:type vmin:
`float`
:param vmax:
*Dynamic spectrum plot only*.
Maximal data value to display.
:type vmax:
`float`
.. rubric:: Plotting layout keywords
:param figname:
Name of the file (absolute or relative path) to save the figure.
Default is ``''`` (i.e., only show the figure).
:type figname:
`str`
:param figsize:
Set the figure size.
Default is ``(10, 10)``.
:type figsize:
`tuple`
:param title:
Set the figure title.
Default is ``''``.
:type title:
`str`
:param colorbar_label:
*Dynamic spectrum plot only*.
Label of the color bar.
Default is ``'Amp'`` if ``decibel=False`` and ``'dB'`` otherwise.
:type colorbar_label:
`str`
:param cmap:
*Dynamic spectrum plot only*.
Color map used to represent the data.
Default is ``'YlGnBu_r'``.
:type cmap:
`str`
:Example:
.. code-block:: python
from nenupy.io.xst import XST
xst = XST("/path/to/XST.fits")
data = xst.get....
"""
max_ma_index = self.mini_arrays.max() + 1
all_mas = np.arange(max_ma_index)
matrix = np.full([max_ma_index, max_ma_index], np.nan, "complex")
ma1, ma2 = np.tril_indices(self.mini_arrays.size, 0)
for ma in all_mas:
if ma not in self.mini_arrays:
ma1[ma1 >= ma] += 1
ma2[ma2 >= ma] += 1
mask = None
if mask_autocorrelations:
mask = ma1 != ma2 # cross_correlation mask
matrix[ma2[mask], ma1[mask]] = np.mean(self.value, axis=(0, 1))[mask]
fig = plt.figure(figsize=kwargs.get("figsize", (10, 10)))
ax = fig.add_subplot(111)
ax.set_aspect("equal")
data = np.absolute(matrix)
if kwargs.get("decibel", True):
data = 10 * np.log10(data)
im = ax.pcolormesh(
all_mas,
all_mas,
data,
shading="nearest",
cmap=kwargs.get("cmap", "YlGnBu"),
vmin=kwargs.get("vmin", np.nanmin(data)),
vmax=kwargs.get("vmax", np.nanmax(data)),
)
ax.set_xticks(all_mas[::2])
ax.set_yticks(all_mas[::2])
ax.grid(alpha=0.5)
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size="5%", pad=0.3)
cbar = fig.colorbar(im, cax=cax)
cbar.set_label(
kwargs.get("colorbar_label", "dB" if kwargs.get("decibel", True) else "Amp")
)
# Axis abels
ax.set_xlabel(f"Mini-Array index")
ax.set_ylabel(f"Mini-Array index")
# Title
ax.set_title(kwargs.get("title", ""))
# Save or show the figure
figname = kwargs.get("figname", "")
if figname != "":
plt.savefig(figname, dpi=300, bbox_inches="tight", transparent=True)
log.info(f"Figure '{figname}' saved.")
else:
plt.show()
plt.close("all")
[docs]
def rephase_visibilities(self, phase_center, uvw):
""" """
log.info(f"Rephasing the visibilities towards {phase_center}...")
# Compute the zenith original phase center
zenith = SkyCoord(
np.zeros(self.time.size),
np.ones(self.time.size) * 90,
unit="deg",
frame=AltAz(obstime=self.time, location=nenufar_position),
)
zenith_phase_center = altaz_to_radec(zenith)
# Define the rotation matrix
def rotation_matrix(skycoord):
""" """
ra_rad = skycoord.ra.rad
dec_rad = skycoord.dec.rad
if np.isscalar(ra_rad):
ra_rad = np.array([ra_rad])
dec_rad = np.array([dec_rad])
cos_ra = np.cos(ra_rad)
sin_ra = np.sin(ra_rad)
cos_dec = np.cos(dec_rad)
sin_dec = np.sin(dec_rad)
return np.array(
[
[cos_ra, -sin_ra, np.zeros(ra_rad.size)],
[-sin_ra * sin_dec, -cos_ra * sin_dec, cos_dec],
[sin_ra * cos_dec, cos_ra * cos_dec, sin_dec],
]
)
# Transformation matrices
to_origin = rotation_matrix(zenith_phase_center) # (3, 3, ntimes)
to_new_center = rotation_matrix(phase_center) # (3, 3, 1)
total_transformation = np.matmul(
np.transpose(to_new_center, (2, 0, 1)), to_origin
) # (3, 3, ntimes)
rotUVW = np.matmul(
np.expand_dims((to_origin[2, :] - to_new_center[2, :]).T, axis=1),
np.transpose(to_origin, (2, 1, 0)),
) # (ntimes, 1, 3)
phase = np.matmul(rotUVW, np.transpose(uvw, (0, 2, 1))) # (ntimes, 1, nvis)
rotate_visibilities = np.exp(
2.0j
* np.pi
* phase
/ wavelength(self.frequency).to(u.m).value[None, :, None]
) # (ntimes, nfreqs, nvis)
new_uvw = np.matmul(
uvw, np.transpose(total_transformation, (2, 0, 1)) # (ntimes, nvis, 3)
)
return rotate_visibilities, new_uvw
[docs]
def make_image(
self,
resolution: u.Quantity = 1 * u.deg,
fov_radius: u.Quantity = 25 * u.deg,
phase_center: SkyCoord = None,
stokes: str = "I",
) -> HpxSky:
"""
:Example:
xst = XST("XST.fits")
data = xst.get_stokes("I")
sky = data.make_image(
resolution=0.5*u.deg,
fov_radius=27*u.deg,
phase_center=SkyCoord(277.382, 48.746, unit="deg")
)
sky[0, 0, 0].plot(
center=SkyCoord(277.382, 48.746, unit="deg"),
radius=24.5*u.deg
)
"""
exposure = self.time[-1] - self.time[0]
# Compute XST UVW coordinates (zenith phased)
uvw = (
compute_uvw(
interferometer=NenuFAR()[self.mini_arrays],
phase_center=None, # will be zenith
time=self.time,
)
.to(u.m)
.value
)
# Prepare visibilities rephasing
if phase_center is None:
phase_center = SkyCoord(
0,
90,
unit="deg",
frame=AltAz(
obstime=self.time[0] + exposure / 2, location=nenufar_position
),
).transform_to("icrs")
rephase_matrix = 1.0
else:
rephase_matrix, uvw = self.rephase_visibilities(
phase_center=phase_center, uvw=uvw
)
# Mask auto-correlations
ma1, ma2 = np.tril_indices(self.mini_arrays.size, 0)
cross_mask = ma1 != ma2
uvw = uvw[:, cross_mask, :]
# Transform to lambda units
wvl = wavelength(self.frequency).to(u.m).value
uvw = uvw[:, None, :, :] / wvl[None, :, None, None] # (t, f, bsl, 3)
# Mean in time
uvw = np.mean(uvw, axis=0)
# Prepare the sky
sky = HpxSky(
resolution=resolution,
time=self.time[0] + exposure / 2,
frequency=np.mean(self.frequency),
polarization=np.array([stokes]),
value=np.nan,
)
# Compute LMN coordinates
log.info("Preparing the sky...")
image_mask = sky.visible_mask[0, 0, 0]
image_mask *= sky.coordinates.separation(phase_center) <= fov_radius
l, m, n = sky.compute_lmn(phase_center=phase_center, coordinate_mask=image_mask)
lmn = np.array([l, m, (n - 1)], dtype=np.float32).T
n_pix = l.size
lmn = da.from_array(lmn, chunks=(np.floor(n_pix / os.cpu_count()), 3))
# Transform to Dask array
n_bsl = uvw.shape[1]
n_freq = self.frequency.size
n_pix = l.size
uvw = da.from_array(
uvw.astype(np.float32),
chunks=(n_freq, max(1, np.floor(n_bsl / os.cpu_count())), 3),
)
# Compute the phase
uvwlmn = np.sum(uvw[:, :, None, :] * lmn[None, None, :, :], axis=-1)
phase = np.exp(-2j * np.pi * uvwlmn) # (f, bsl, npix)
# Rephase and average visibilites
vis = np.mean(self.value * rephase_matrix, axis=0)[ # Mean in time
..., cross_mask
] # (nfreqs, nvis)
# Make dirty image
dirty = np.nanmean( # mean in baselines
np.real(np.mean(vis[:, :, None] * phase, axis=0)), axis=0 # mean in freq
)
# Insert dirty image in Sky object
log.info(
f"Computing image (time: {self.time.size}, frequency: {self.frequency.size}, baselines: {vis.shape[1]}, pixels: {phase.shape[-1]})... "
)
with ProgressBar() if log.getEffectiveLevel() <= logging.INFO else DummyCtMgr():
sky.value[0, 0, 0, image_mask] = dirty.compute()
return sky
[docs]
def make_nearfield(
self, radius: u.Quantity = 400 * u.m, npix: int = 64, sources: list = []
):
r"""Computes the Near-field image from the cross-correlation
statistics data :math:`\mathcal{V}`.
The distances between each Mini-Array :math:`{\rm MA}_i`
and the ground positions :math:`\Delta` is:
.. math::
d_{\rm{MA}_i} (x, y) = \sqrt{
({\rm MA}_{i, x} - \Delta_x)^2 + ({\rm MA}_{i, y} - \Delta_y)^2 + \left( {\rm MA}_{i, z} - \sum_j \frac{{\rm MA}_{j, z}}{n_{\rm MA}} - 1 \right)^2
}
Then, the near-field image :math:`n_f` can be retrieved
as follows (:math:`k` and :math:`l` being two distinct
Mini-Arrays):
.. math::
n_f (x, y) = \sum_{k, l} \left| \sum_{\nu} \langle \mathcal{V}_{\nu, k, l}(t) \rangle_t e^{2 \pi i \left( d_{{\rm MA}_k} - d_{{\rm MA}_l} \right) (x, y) \frac{\nu}{c}} \right|
.. note::
To simulate astrophysical source of brightness :math:`\mathcal{B}`
footprint on the near-field, its visibility per baseline
of Mini-Arrays :math:`k` and :math:`l` are computed as:
.. math::
\mathcal{V}_{{\rm simu}, k, l} = \mathcal{B} e^{2 \pi i \left( \mathbf{r}_k - \mathbf{r}_l \right) \cdot \mathbf{u} \frac{\nu}{c}}
with :math:`\mathbf{r}` the ENU position of the Mini-Arrays,
:math:`\mathbf{u} = \left( \cos(\theta) \sin(\phi), \cos(\theta) \cos(\phi), sin(\theta) \right)`
the ground projection vector (in East-North-Up coordinates),
(:math:`\phi` and :math:`\theta` are the source horizontal
coordinates azimuth and elevation respectively).
:param radius:
Radius of the ground image. Default is ``400m``.
:type radius:
:class:`~astropy.units.Quantity`
:param npix:
Number of pixels of the image size. Default is ``64``.
:type npix:
`int`
:param sources:
List of source names for which their near-field footprint
may be computed. Only sources above 10 deg elevation
will be considered.
:type sources:
`list`
:returns:
Tuple of near-field image and a dictionnary
containing all source footprints.
:rtype:
(:class:`~numpy.ndarray`, `dict`)
:Example:
.. code-block:: python
from nenupy.io.xst import XST
xst = XST("xst_file.fits")
nearfield, src_dict = xst.make_nearfield(sources=["Cas A", "Sun"])
.. versionadded:: 1.1.0
"""
def compute_nearfield_imprint(visibilities, phase):
# Phase and average in frequency
nearfield = np.mean(visibilities[..., None, None] * phase, axis=0)
# Average in baselines
nearfield = np.nanmean(np.abs(nearfield), axis=0)
with ProgressBar() if log.getEffectiveLevel() <= logging.INFO else DummyCtMgr():
return nearfield.compute()
# Mini-Array positions in ENU coordinates
nenufar = NenuFAR()[self.mini_arrays]
ma_etrs = l93_to_etrs(nenufar.antenna_positions)
ma_enu = etrs_to_enu(ma_etrs)
# Treat baselines
ma1, ma2 = np.tril_indices(self.mini_arrays.size, 0)
cross_mask = ma1 != ma2
# Mean time of observation
obs_time = self.time[0] + (self.time[-1] - self.time[0]) / 2.0
# Delays at the ground
radius_m = radius.to(u.m).value
ground_granularity = np.linspace(-radius_m, radius_m, npix)
posx, posy = np.meshgrid(ground_granularity, ground_granularity)
posz = np.ones_like(posx) * (np.average(ma_enu[:, 2]) + 1)
ground_grid = np.stack((posx, posy, posz), axis=2)
ground_distances = np.sqrt(
np.sum((ma_enu[:, None, None, :] - ground_grid[None]) ** 2, axis=-1)
)
grid_delays = (
ground_distances[ma1] - ground_distances[ma2]
) # (nvis, npix, npix)
n_bsl = ma1[cross_mask].size
grid_delays = da.from_array(
grid_delays[cross_mask],
chunks=(max(1, np.floor(n_bsl / os.cpu_count())), npix, npix),
)
# Mean in time the visibilities
vis = np.mean(self.value, axis=0)[..., cross_mask] # (nfreqs, nvis)
vis = da.from_array(
vis,
chunks=(
1,
max(1, np.floor(n_bsl / os.cpu_count())),
), # (self.frequency.size, np.floor(n_bsl/os.cpu_count()))
)
# Make the nearfield image
log.info(
f"Computing nearfield (time: {self.time.size}, frequency: {self.frequency.size}, baselines: {vis.shape[1]}, pixels: {posx.size})... "
)
wvl = wavelength(self.frequency).to(u.m).value
phase = np.exp(
2.0j * np.pi * (grid_delays[None, ...] / wvl[:, None, None, None])
)
log.debug("Computing the phase term...")
with ProgressBar() if log.getEffectiveLevel() <= logging.INFO else DummyCtMgr():
phase = phase.compute()
log.debug("Computing the nearf-field...")
nearfield = compute_nearfield_imprint(vis, phase)
# Compute nearfield imprints for other sources
simu_sources = {}
for src_name in sources:
# Check that the source is visible
if src_name.lower() in [
"sun",
"moon",
"venus",
"mars",
"jupiter",
"saturn",
"uranus",
"neptune",
]:
src = SolarSystemTarget.from_name(name=src_name, time=obs_time)
else:
src = FixedTarget.from_name(name=src_name, time=obs_time)
altaz = src.horizontal_coordinates # [0]
if altaz.alt.deg <= 10:
log.debug(
f"{src_name}'s elevation {altaz[0].alt.deg}<=10deg, not considered for nearfield imprint."
)
continue
# Projection from AltAz to ENU vector
az_rad = altaz.az.rad
el_rad = altaz.alt.rad
cos_az = np.cos(az_rad)
sin_az = np.sin(az_rad)
cos_el = np.cos(el_rad)
sin_el = np.sin(el_rad)
to_enu = np.array([cos_el * sin_az, cos_el * cos_az, sin_el])
# src_delays = np.matmul(
# ma_enu[ma1] - ma_enu[ma2],
# to_enu
# )
# src_delays = da.from_array(
# src_delays[cross_mask, :],
# chunks=((np.floor(n_bsl/os.cpu_count()), npix, npix), 1)
# )
ma1_enu = da.from_array(
ma_enu[ma1[cross_mask]], chunks=max(1, np.floor(n_bsl / os.cpu_count()))
)
ma2_enu = da.from_array(
ma_enu[ma2[cross_mask]], chunks=max(1, np.floor(n_bsl / os.cpu_count()))
)
src_delays = np.matmul(ma1_enu - ma2_enu, to_enu)
# Simulate visibilities
src_vis = np.exp(2.0j * np.pi * (src_delays / wvl))
src_vis = np.swapaxes(src_vis, 1, 0)
log.debug(f"Computing the nearf-field imprint of {src_name}...")
simu_sources[src_name] = compute_nearfield_imprint(src_vis, phase)
return nearfield, simu_sources
# ============================================================= #
# ------------------------- Crosslet -------------------------- #
[docs]
class Crosslet(ABC):
"""Crosslet abstract class (both for XST and NenuFAR TV dat files).
.. rubric:: Methods Summary
.. autosummary::
~Crosslet.get
~Crosslet.get_stokes
~Crosslet.get_beamform
.. rubric:: Attributes and Methods Documentation
"""
# def __init__(self,
# mini_arrays: np.ndarray,
# frequency: u.Quantity,
# time: Time,
# visibilities: np.ndarray
# ):
# self.mini_arrays = mini_arrays
# self.frequency = frequency
# self.time = time
# self.visibilities = visibilities
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def get(
self,
polarization: str = "XX",
miniarray_selection: np.ndarray = None,
frequency_selection: str = None,
time_selection: str = None,
) -> XST_Slice:
""" """
mas = self._select_mini_arrays(miniarray_selection)
frequency_mask = self._get_freq_mask(frequency_selection)
time_mask = self._get_time_mask(time_selection)
return XST_Slice(
mini_arrays=mas,
time=self.time[time_mask],
frequency=self.frequencies[frequency_mask],
value=self._get(
frequency_selection=frequency_selection,
time_selection=time_selection,
polarization=polarization,
mini_arrays=mas,
),
)
[docs]
def get_stokes(
self,
stokes: str = "I",
miniarray_selection: np.ndarray = None,
frequency_selection: str = None,
time_selection: str = None,
) -> XST_Slice:
r"""
``frequency_selection`` and ``time_selection``
arguments accept `str` values formatted as, e.g.,
``'>={value}'`` or ``'>={value_1} & <{value_2}'`` or ``'=={value}'``.
:param frequency_selection:
Frequency selection. The expected ``'{value}'`` format is frequency units, e.g. ``'>=50MHz'`` or ``'< 1 GHz'``.
Default is ``None`` (i.e., no selection upon frequency).
:type frequency_selection:
`str`
:param time_selection:
Time selection. The expected ``'{value}'`` format is ISOT, e.g. ``'>=2022-01-01T12:00:00'``.
Default is ``None`` (i.e., no selection upon time).
:type time_selection:
`str`
:param stokes:
Stokes parameters to return
:type stokes:
`str`
.. math::
\begin{cases}
\rm{I} = \frac{1}{2}(\rm{XX} + \rm{YY})\\
\rm{Q} = \frac{1}{2}(\rm{XX} - \rm{YY})\\
\rm{U} = \frac{1}{2}(\rm{XY} + \rm{YX})\\
\rm{V} = \frac{-i}{2}(\rm{XY} - \rm{YX})\\
\frac{\rm{L}}{\rm{I}} = \frac{\sqrt{\rm{Q}^2 + \rm{U}^2}}{\rm{I}}\\
\frac{\rm{V}}{\rm{I}} = \frac{\rm{V}}{\rm{I}}\\
\end{cases}
"""
mas = self._select_mini_arrays(miniarray_selection)
frequency_mask = self._get_freq_mask(frequency_selection)
time_mask = self._get_time_mask(time_selection)
stokes_parameters = {
"I": {"cross": ["XX", "YY"], "compute": lambda xx, yy: 0.5 * (xx + yy)},
"Q": {"cross": ["XX", "YY"], "compute": lambda xx, yy: 0.5 * (xx - yy)},
"U": {"cross": ["XY", "YX"], "compute": lambda xy, yx: 0.5 * (xy + yx)},
"V": {"cross": ["XY", "YX"], "compute": lambda xy, yx: -0.5j * (xy - yx)},
"FL": {
"cross": ["XX", "YY", "XY", "YX"],
"compute": lambda xx, yy, xy, yx: np.sqrt(
(0.5 * (xx - yy)) ** 2 + (0.5 * (xy + yx)) ** 2
)
/ (0.5 * (xx + yy)),
},
"FV": {
"cross": ["XX", "YY", "XY", "YX"],
"compute": lambda xx, yy, xy, yx: np.abs(-0.5j * (xy - yx))
/ (0.5 * (xx + yy)),
},
}
try:
selected_stokes = stokes_parameters[stokes]
except KeyError:
log.warning(f"Available polarizations are: {stokes_parameters.keys()}.")
raise
return XST_Slice(
mini_arrays=mas,
time=self.time[time_mask],
frequency=self.frequencies[frequency_mask],
value=selected_stokes["compute"](
*map(
lambda pol: self._get(
frequency_selection=frequency_selection,
time_selection=time_selection,
polarization=pol,
mini_arrays=mas,
),
selected_stokes["cross"],
)
),
)
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _select_mini_arrays(self, mini_arrays):
""" """
if mini_arrays is None:
mini_arrays = self.mini_arrays
if np.any(~np.isin(mini_arrays, self.mini_arrays)):
raise IndexError(
f"Selected Mini-Arrays {mini_arrays} are outside possible values: {self.mini_arrays}."
)
return mini_arrays
def _get_freq_mask(self, frequency_selection=None):
""" """
# Frequency selection
frequencies = self.frequencies
if frequency_selection is None:
frequency_selection = f">={frequencies.min()} & <= {frequencies.max()}"
frequency_mask = self._parse_frequency_condition(frequency_selection)(
frequencies
)
if not np.any(frequency_mask):
log.warning(
"Empty frequency selection, input values should fall "
f"between {frequencies.min()} and {frequencies.max()}, "
f"i.e.: '>={frequencies.min()} & <= {frequencies.max()}'"
)
return frequency_mask
def _get_time_mask(self, time_selection=None):
""" """
# Time selection
if time_selection is None:
time_selection = f">={self.time[0].isot} & <= {self.time[-1].isot}"
time_mask = self._parse_time_condition(time_selection)(self.time)
if not np.any(time_mask):
log.warning(
"Empty time selection, input values should fall "
f"between {self.time[0].isot} and {self.time[-1].isot}, "
f"i.e.: '>={self.time[0].isot} & <= {self.time[-1].isot}'"
)
return time_mask
def _get_cross_idx(self, c1="X", c2="X", mini_arrays=None):
"""Retrieves visibilities indices for the given cross polarizations"""
mini_arrays_size = self.mini_arrays.size
# Mini-arrays selection
ma_indices = np.arange(mini_arrays_size, dtype="int")[
np.isin(self.mini_arrays, mini_arrays)
]
# Polarization array
corr = np.array(["X", "Y"] * mini_arrays_size)
i_ant1, i_ant2 = np.tril_indices(mini_arrays_size * 2, 0)
# Define polarization and mini-arrays masks
corr_mask = (corr[i_ant1] == c1) & (corr[i_ant2] == c2)
ma_mask = np.isin(i_ant1 // 2, ma_indices) & np.isin(i_ant2 // 2, ma_indices)
indices = np.arange(i_ant1.size)[corr_mask & ma_mask]
return indices
def _get(
self,
frequency_selection: str = None,
time_selection: str = None,
polarization: str = "XX",
mini_arrays: np.ndarray = None,
) -> np.ndarray:
""" """
# Polarization selection
allowed_polarizations = ["XX", "XY", "YX", "YY"]
if polarization not in allowed_polarizations:
raise ValueError(
f"'polarization' argument must be one of the following: {allowed_polarizations}."
)
# Frequency selection
frequency_mask = self._get_freq_mask(frequency_selection)
# Time selection
time_mask = self._get_time_mask(time_selection)
# Combined mask
time_freq_mask = time_mask[:, None] * frequency_mask
# Final shape
if mini_arrays is None:
mini_arrays = self.mini_arrays
ma1, ma2 = np.tril_indices(mini_arrays.size, 0)
# final_shape = (np.sum(time_mask), np.sum(frequency_mask), ma1.size)
final_shape = (
np.any(time_freq_mask, axis=1).sum(),
np.any(time_freq_mask, axis=0).sum(),
ma1.size,
)
if polarization == "XY":
# Deal with lack of auto XY cross in XST-like data
auto_mask = ma1 == ma2
cross_mask = ~auto_mask
data_tf_selected = self.data[time_freq_mask]
# Deal with dimension cut in case of True over an entire axis
if final_shape[0] == 1:
data_tf_selected = np.expand_dims(data_tf_selected, axis=0)
if final_shape[1] == 1:
data_tf_selected = np.expand_dims(data_tf_selected, axis=1)
yx = data_tf_selected[:, :, self._get_cross_idx("Y", "X", mini_arrays)]
_xy = np.zeros((list(yx.shape[:-1]) + [ma1.size]), dtype=complex)
_xy[:, :, auto_mask] = yx[:, :, auto_mask].conj()
# Get XY correlations
_xy[:, :, cross_mask] = data_tf_selected[
:, :, self._get_cross_idx("X", "Y", mini_arrays)
]
return _xy.reshape(final_shape)
else:
return self.data[time_freq_mask[:, :]][
:, self._get_cross_idx(*list(polarization), mini_arrays)
].reshape(final_shape)
# return self.data[
# np.ix_(
# time_mask,
# frequency_mask,
# self._get_cross_idx(*list(polarization), mini_arrays)
# )
# ].reshape(final_shape)
# ============================================================= #
# ---------------------------- XST ---------------------------- #
[docs]
class XST(StatisticsData, Crosslet):
"""Crosslet STatistics reading class.
.. rubric:: Attributes Summary
.. autosummary::
~XST.mini_arrays
.. rubric:: Methods Summary
.. autosummary::
~Crosslet.get
~Crosslet.get_stokes
~Crosslet.get_beamform
.. rubric:: Attributes and Methods Documentation
"""
def __init__(self, file_name):
super().__init__(file_name=file_name)
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def mini_arrays(self):
"""Retrieves the list of Mini-Arrays used to get the cross-correlations.
:getter: Mini-Arrays list.
:type: :class:`~numpy.ndarray`
"""
return self._meta_data["ins"]["noMROn"][0]
# ============================================================= #
# ------------------------- TV_Image -------------------------- #
[docs]
class TV_Image:
""" """
def __init__(
self, tv_image: HpxSky, analog_pointing: SkyCoord, fov_radius: u.Quantity
):
self.tv_image = tv_image
self.analog_pointing = analog_pointing
self.fov_radius = fov_radius
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
@classmethod
def from_fits(cls, file_name):
""" """
header = fits.getheader(file_name, ext=1)
# Load the image
image = read_map(file_name, dtype=None, partial="PARTIAL" in header["OBJECT"])
# Fill NaNs
if "PARTIAL" in header["OBJECT"]:
image[mask_bad(image)] = np.nan
# Recreate the sky
sky = HpxSky(
resolution=Angle(
angle=nside2resol(header["NSIDE"], arcmin=True), unit=u.arcmin
),
time=Time(header["OBSTIME"]),
frequency=header["FREQ"] * u.MHz,
polarization=np.array([header["STOKES"]]),
value=image.reshape((1, 1, 1, image.size)),
)
return cls(
tv_image=sky,
analog_pointing=SkyCoord(
header["AZANA"] * u.deg,
header["ELANA"] * u.deg,
frame=AltAz(obstime=Time(header["OBSTIME"]), location=nenufar_position),
),
fov_radius=header["FOV"] * u.deg / 2,
)
[docs]
def save_fits(self, file_name: str, partial: bool = True):
""" """
phase_center_eq = altaz_to_radec(self.analog_pointing)
header = [
("software", "nenupy"),
("version", nenupy.__version__),
("contact", nenupy.__email__),
("azana", self.analog_pointing.az.deg),
("elana", self.analog_pointing.alt.deg),
("freq", self.tv_image.frequency[0].to(u.MHz).value),
("obstime", self.tv_image.time[0].isot),
("fov", self.fov_radius.to(u.deg).value * 2),
("pc_ra", phase_center_eq.ra.deg),
("pc_dec", phase_center_eq.dec.deg),
("stokes", self.tv_image.polarization[0]),
]
map2write = self.tv_image.value[0, 0, 0].copy()
write_map(
filename=file_name,
m=map2write,
nest=False,
coord="C",
overwrite=True,
dtype=self.tv_image.value.dtype,
extra_header=header,
partial=partial,
)
log.info(
"HEALPix image of {} cells (nside={}) saved in `{}`.".format(
map2write.size, self.tv_image.nside, file_name
)
)
[docs]
def save_png(
self,
figname: str = "",
beam_contours: bool = True,
show_sources: bool = True,
**kwargs,
):
""" """
image_center = altaz_to_radec(
SkyCoord(
self.analog_pointing.az,
self.analog_pointing.alt,
frame=AltAz(obstime=self.tv_image.time[0], location=nenufar_position),
)
)
# kwargs = {}
if show_sources:
src_names = []
src_position = []
with open(join(dirname(__file__), "nenufar_tv_sources.json")) as src_file:
sources = json.load(src_file)
for name in sources["FixedSources"]:
src = FixedTarget.from_name(name, time=self.tv_image.time[0])
if src.coordinates.separation(image_center) <= 0.8 * self.fov_radius:
src_names.append(name)
src_position.append(src.coordinates)
for name in sources["SolarSystemSources"]:
src = SolarSystemTarget.from_name(name, time=self.tv_image.time[0])
if src.coordinates.separation(image_center) <= 0.8 * self.fov_radius:
src_names.append(name)
src_position.append(src.coordinates)
if len(src_position) != 0:
kwargs["text"] = (SkyCoord(src_position), src_names, "white")
if beam_contours:
# Simulate the array factor
ma = MiniArray()
af_sky = ma.array_factor(
sky=HpxSky(
resolution=0.2 * u.deg,
time=self.tv_image.time[0],
frequency=self.tv_image.frequency[0],
),
pointing=Pointing(coordinates=image_center, time=self.tv_image.time[0]),
)
# Normalize the array factor
af = af_sky[0, 0, 0].compute()
af_normalized = af / af.max()
kwargs["contour"] = (af_normalized, np.arange(0.5, 1, 0.2), "copper")
# Plot
self.tv_image[0, 0, 0].plot(
center=image_center,
radius=self.fov_radius - 2.5 * u.deg,
figname=figname,
colorbar_label=f"Stokes {self.tv_image.polarization[0]}",
**kwargs,
)
# ============================================================= #
# ----------------------- TV_Nearfield ------------------------ #
[docs]
class TV_Nearfield:
""" """
def __init__(
self,
nearfield: np.ndarray,
source_imprints: dict,
npix: int,
time: Time,
frequency: u.Quantity,
radius: u.Quantity,
mini_arrays: np.ndarray,
stokes: str,
):
self.nearfield = nearfield
self.source_imprints = source_imprints
self.npix = npix
self.time = time
self.frequency = frequency
self.radius = radius
self.mini_arrays = mini_arrays
self.stokes = stokes
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
@classmethod
def from_fits(cls, file_name: str):
"""Loads a nearfield previously stored in a FITS file.
:param file_name:
Path to the FITS file containing a near-field
image (whose format is such as created by the
:meth:`~nenupy.io.xst.TV_Nearfield.save_fits`
method).
:type file_name:
`str`
:returns:
Instance of :class:`~nenupy.io.xst.TV_Nearfield`.
:rtype:
:class:`~nenupy.io.xst.TV_Nearfield`
:Example:
from nenupy.io.xst import TV_Nearfield
nf = TV_Nearfield.from_fits("/path/to/nearfield.fits")
"""
reserved_names = ["PRIMARY", "NEAR-FIELD", "MINI-ARRAYS"]
hdus = fits.open(file_name)
nf_header = hdus["NEAR-FIELD"].header
nf = cls(
nearfield=hdus["NEAR-FIELD"].data,
mini_arrays=hdus["MINI-ARRAYS"].data,
npix=nf_header["NAXIS1"],
frequency=nf_header["FREQUENC"] * u.MHz,
time=Time(nf_header["DATE-OBS"]),
source_imprints={
hdu.header["SOURCE"]: hdu.data
for hdu in hdus
if hdu.name not in reserved_names
},
radius=nf_header["RADIUS"] * u.m,
stokes=nf_header["STOKES"],
)
return nf
[docs]
def save_fits(self, file_name: str):
"""Saves a nearfield made from NenuFAR TV data as a FITS file.
:param file_name:
Name of the file to save.
:type file_name:
`str`
:Example:
from nenupy.io.xst import NenufarTV
tv = NenufarTV("20191204_132113_nenufarTV.dat")
nf_object = tv.compute_nearfield_tv(sources=["Cyg A", "Cas A", "Sun"])
nf_object.save_fits(file_name="/path/to/nearfield.fits")
"""
# Header
prim_header = fits.Header()
prim_header["OBSERVER"] = "NenuFAR Team"
prim_header["AUTHOR"] = f"nenupy {nenupy.__version__}"
prim_header["DATE"] = Time.now().isot
prim_header["INSTRUME"] = "XST"
prim_header["OBSERVER"] = "NenuFAR-TV"
prim_header[
"ORIGIN"
] = "Station de Radioastronomie de Nancay, LESIA, Observatoire de Paris"
prim_header[
"REFERENC"
] = "Alan Loh and the NenuFAR team, nenupy, 2020 (DOI: 10.5281/zenodo.3775196.)"
prim_header["TELESCOP"] = "NenuFAR"
prim_hdu = fits.PrimaryHDU(header=prim_header)
# Near-Field
nf_header = fits.Header()
nf_header["NAXIS"] = 2
nf_header["NAXIS1"] = self.npix
nf_header["NAXIS2"] = self.npix
nf_header["DATE-OBS"] = (self.time.isot, "Mean observation UTC date")
nf_header["DATAMIN"] = self.nearfield.min()
nf_header["DATAMAX"] = self.nearfield.max()
nf_header["FREQUENC"] = (
self.frequency.to(u.MHz).value,
"Mean observing frequency in MHz.",
)
nf_header["STOKES"] = self.stokes.upper()
nf_header["DESCRIPT"] = "Near-Field image."
nf_header["RADIUS"] = (
self.radius.to(u.m).value,
"Radius of the ground (in m).",
)
nf_hdu = fits.ImageHDU(data=self.nearfield, header=nf_header, name="Near-Field")
# Mini-Arrays
ant_header = fits.Header()
ant_header["DESCRIPT"] = "Mini-Array names"
ant_hdu = fits.ImageHDU(
data=self.mini_arrays, header=ant_header, name="Mini-Arrays"
)
# HDU list
hduList = fits.HDUList([prim_hdu, nf_hdu, ant_hdu])
for src in self.source_imprints:
hdu_name = src.replace(" ", "_")
src_header = fits.Header()
src_header["NAXIS"] = 2
src_header["NAXIS1"] = self.npix
src_header["NAXIS2"] = self.npix
src_header["DATE-OBS"] = (self.time.isot, "Mean observation UTC date")
src_header["DATAMIN"] = self.source_imprints[src].min()
src_header["DATAMAX"] = self.source_imprints[src].max()
src_header["FREQUENC"] = (
self.frequency.to(u.MHz).value,
"Mean observing frequency in MHz.",
)
src_header["STOKES"] = self.stokes.upper()
src_header["SOURCE"] = (src, "Name of the source imprint on the near-field")
src_header["DESCRIPT"] = "Normalized sky source imprint on the near-field."
src_header["RADIUS"] = (
self.radius.to(u.m).value,
"Radius of the ground (in m).",
)
src_hdu = fits.ImageHDU(
data=self.source_imprints[src], name=hdu_name, header=src_header
)
hduList.append(src_hdu)
hduList.writeto(file_name, overwrite=True)
log.info(f"NearField saved in {file_name}.")
[docs]
def save_png(self, figname: str = "", **kwargs):
""" """
radius = self.radius.to(u.m).value
colormap = kwargs.get("cmap", "YlGnBu_r")
# Mini-Array positions in ENU coordinates
nenufar = NenuFAR()[self.mini_arrays]
ma_etrs = l93_to_etrs(nenufar.antenna_positions)
ma_enu = etrs_to_enu(ma_etrs)
# Plot the nearfield
fig, ax = plt.subplots(figsize=kwargs.get("figsize", (10, 10)))
nf_image_db = 10 * np.log10(self.nearfield)
ax.imshow(
np.flipud(nf_image_db), # This needs to be understood...
cmap=colormap,
extent=[-radius, radius, -radius, radius],
zorder=0,
vmin=kwargs.get("vmin", np.min(nf_image_db)),
vmax=kwargs.get("vmax", np.max(nf_image_db)),
)
# Colorbar
cax = inset_axes(
ax,
width="5%",
height="100%",
loc="lower left",
bbox_to_anchor=(1.05, 0.0, 1, 1),
bbox_transform=ax.transAxes,
borderpad=0,
)
cb = ColorbarBase(
cax,
cmap=get_cmap(name=colormap),
orientation="vertical",
norm=Normalize(
vmin=kwargs.get("vmin", np.min(nf_image_db)),
vmax=kwargs.get("vmax", np.max(nf_image_db)),
),
ticks=LinearLocator(),
format="%.2f",
)
cb.solids.set_edgecolor("face")
cb.set_label(f"dB (Stokes {self.stokes})")
# Show the contour of the simulated source imprints
ground_granularity = np.linspace(-radius, radius, self.npix)
posx, posy = np.meshgrid(ground_granularity, ground_granularity)
dist = np.sqrt(posx**2 + posy**2)
border_min = 0.1 * self.npix
border_max = self.npix - 0.1 * self.npix
for src in self.source_imprints.keys():
# Normalize the imprint
imprint = self.source_imprints[src]
imprint /= imprint.max()
# Plot the contours
ax.contour(
imprint,
np.arange(0.8, 1, 0.04),
cmap="copper",
alpha=0.5,
extent=[-radius, radius, -radius, radius],
zorder=5,
)
# Find the maximum of the emission
max_y, max_x = np.unravel_index(imprint.argmax(), imprint.shape)
# If maximum outside the plot, recenter it
if (
(max_x <= border_min)
or (max_y <= border_min)
or (max_x >= border_max)
or (max_y >= border_max)
):
dist[dist <= np.median(dist)] = 0
max_y, max_x = np.unravel_index(
((1 - dist / dist.max()) * imprint).argmax(), imprint.shape
)
# Show the source name associated to the imprint
ax.text(
ground_granularity[max_x],
ground_granularity[max_y],
f" {src}",
color="#b35900",
fontweight="bold",
va="center",
ha="center",
zorder=30,
)
# NenuFAR mini-array positions
ax.scatter(ma_enu[:, 0], ma_enu[:, 1], 20, color="black", zorder=10)
for i in range(ma_enu.shape[0]):
ax.text(
ma_enu[i, 0],
ma_enu[i, 1],
f" {self.mini_arrays[i]}",
color="black",
zorder=10,
)
# ax.scatter(
# building_enu[:, 0],
# building_enu[:, 1],
# 20,
# color="tab:red",#'tab:orange',
# zorder=10
# )
# Plot axis labels
ax.set_xlabel(r"$\Delta x$ (m)")
ax.set_ylabel(r"$\Delta y$ (m)")
ax.set_title(
f"{np.mean(self.frequency.to(u.MHz).value):.3f} MHz -- {self.time.isot}"
)
# Save or show the figure
if figname != "":
plt.savefig(figname, dpi=300, bbox_inches="tight", transparent=True)
log.info(f"Figure '{figname}' saved.")
else:
plt.show()
plt.close("all")
# ============================================================= #
# ------------------------- NenufarTV ------------------------- #
[docs]
class NenufarTV(StatisticsData, Crosslet):
"""Crosslet abstract class (both for XST and NenuFAR TV dat files).
.. rubric:: Attributes Summary
.. autosummary::
~StatisticsData.frequencies
.. rubric:: Methods Summary
.. autosummary::
~Crosslet.get
~Crosslet.get_stokes
~Crosslet.get_beamform
~NenufarTV.compute_nenufar_tv
~NenufarTV.compute_nearfield_tv
.. rubric:: Attributes and Methods Documentation
"""
def __init__(self, file_name):
self.file_name = file_name
self.mini_arrays = None
self.time = None
self.dt = None
self.frequencies = None
self.data = None
self.load_tv_data()
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def compute_nenufar_tv(self, analog_pointing_file: str = None, **kwargs):
"""
kwargs
fov_radius
resolution
stokes
"""
obs_time = self.time[0] + (self.time[-1] - self.time[0]) / 2
fov_radius = kwargs.get("fov_radius", 27 * u.deg)
resolution = kwargs.get("resolution", 0.5 * u.deg)
stokes = kwargs.get("stokes", "I")
if analog_pointing_file is None:
phase_center_altaz = SkyCoord(
0,
90,
unit="deg",
frame=AltAz(obstime=obs_time, location=nenufar_position),
)
else:
pointing = Pointing.from_file(
analog_pointing_file, include_corrections=False
)[obs_time.reshape((1,))]
phase_center_altaz = pointing.custom_ho_coordinates[0]
data = self.get_stokes(stokes)
sky_image = data.make_image(
resolution=resolution,
fov_radius=fov_radius,
phase_center=altaz_to_radec(phase_center_altaz),
)
return TV_Image(
tv_image=sky_image,
analog_pointing=phase_center_altaz,
fov_radius=fov_radius,
)
[docs]
def compute_nearfield_tv(self, sources: list = [], **kwargs):
"""
kwargs
stokes
radius
npix
:Example:
from nenupy.io.xst import NenufarTV
tv = NenufarTV("20191204_132113_nenufarTV.dat")
nf_object = tv.compute_nearfield_tv(
sources=["Cyg A", "Cas A", "Vir A", "Tau A", "Sun"],
npix=64
)
"""
obs_time = self.time[0] + (self.time[-1] - self.time[0]) / 2
stokes = kwargs.get("stokes", "I")
radius = kwargs.get("radius", 400 * u.m)
npix = kwargs.get("npix", 64)
data = self.get_stokes(stokes)
nf, src_imprints = data.make_nearfield(
radius=radius, npix=npix, sources=sources
)
return TV_Nearfield(
nearfield=nf,
source_imprints=src_imprints,
npix=npix,
time=obs_time,
frequency=np.mean(self.frequencies),
radius=radius,
mini_arrays=data.mini_arrays,
stokes=stokes,
)
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
[docs]
def load_tv_data(self):
""" """
# Extract the ASCII header (5 first lines)
with open(self.file_name, "rb") as f:
header = list(islice(f, 0, 5))
assert header[0] == b"HeaderStart\n", "Wrong header start"
assert header[-1] == b"HeaderStop\n", "Wrong header stop"
header = [s.decode("utf-8") for s in header]
hd_size = sum([len(s) for s in header])
# Parse informations into Crosslet attributes
keys = ["frequencies", "mini_arrays", "dt"]
search = ["Freq.List", "Mr.List", "accumulation"]
types = ["float64", "int", "int"]
for key, word, typ in zip(keys, search, types):
unit = u.MHz if key == "freqs" else 1
for h in header:
if word in h:
setattr(
self,
key,
np.array(h.split("=")[1].split(","), dtype=typ) * unit,
)
# Deduce the dtype for decoding
n_ma = self.mini_arrays.size
n_sb = self.frequencies.size
dtype = np.dtype(
[("jd", "float64"), ("data", "complex64", (n_sb, n_ma * n_ma * 2 + n_ma))]
)
# Decoding the binary file
tmp = np.memmap(filename=self.file_name, dtype="int8", mode="r", offset=hd_size)
decoded = tmp.view(dtype)
self.dt = TimeDelta(self.dt, format="sec")
self.frequencies *= u.MHz
self.data = decoded["data"] / self.dt.sec
self.time = Time(decoded["jd"], format="jd", precision=0)