#! /usr/bin/python3
# -*- coding: utf-8 -*-
"""
*************
NenuFAR Tools
*************
"""
__author__ = "Alan Loh"
__copyright__ = "Copyright 2021, nenupy"
__credits__ = ["Alan Loh"]
__maintainer__ = "Alan"
__email__ = "alan.loh@obspm.fr"
__status__ = "Production"
__all__ = [
"freq2sb",
"sb2freq",
"instrument_temperature",
"miniarrays_rotated_like",
"read_cal_table",
"generate_nenufar_subarrays",
"lofar_instrument_temperature",
"pointing_correction"
]
import numpy as np
import astropy.units as u
from typing import List, Tuple
from os.path import join, dirname
from astropy.coordinates import Latitude, Longitude, SkyCoord, AltAz
from scipy.io import readsav
from scipy.interpolate import RegularGridInterpolator
from nenupy.instru import lna_gain, nenufar_miniarrays
from nenupy.astro.astro_tools import sky_temperature
import logging
log = logging.getLogger(__name__)
# ============================================================= #
# ------------------------- freq2sb --------------------------- #
# ============================================================= #
[docs]
def freq2sb(frequency: u.Quantity):
r""" Conversion between the frequency :math:`\nu` and the
NenuFAR sub-band index :math:`n_{\rm SB}`.
Each NenuFAR sub-band has a bandwidth of
:math:`\Delta \nu = 195.3125\, \rm{kHz}`:
.. math::
n_{\rm SB} = \lfloor*{ \frac{\nu}{\Delta \nu} + \frac{1}{2} \rfloor
:param frequency:
Frequency to convert in sub-band index.
:type frequency:
:class:`~astropy.units.Quantity`
:returns:
Sub-band index, same shape as ``frequency``.
:rtype:
`int` or :class:`~numpy.ndarray`
:example:
.. code-block:: python
from nenupy.instru import freq2sb
import astropy.units as u
freq2sb(frequency=50.5*u.MHz)
freq2sb(frequency=[50.5, 51]*u.MHz)
"""
if not isinstance(frequency, u.Quantity):
raise TypeError(
f"`frequency` - {u.Quantity} expected."
)
if (frequency.min() < 0 * u.MHz) or (frequency.max() > 100 * u.MHz):
raise ValueError(
"'frequency' should be between 0 and 100 MHz."
)
frequency = frequency.to(u.MHz)
sb_width = 100. * u.MHz / 512
sb_idx = np.floor(frequency/sb_width + 0.5)
return sb_idx.astype(int).value
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ------------------------- freq2sb --------------------------- #
# ============================================================= #
[docs]
def sb2freq(subband):
r""" Conversion between NenuFAR sub-band index :math:`n_{\rm SB}`
to sub-band starting frequency :math:`\nu_{\rm start}`.
.. math::
\nu_{\rm start} = n_{\rm SB} \times \Delta \nu
Each NenuFAR sub-band has a bandwidth of
:math:`\Delta \nu = 195.3125\, \rm{kHz}`, therefore, the
sub-band :math:`n_{\rm SB}` goes from :math:`\nu_{\rm start}`
to :math:`\nu_{\rm stop} = \nu_{\rm start} + \Delta \nu`.
:param subband:
Sub-band index (from 0 to 511).
:type subband:
`int` or :class:`~numpy.ndarray` of `int`
:returns:
Sub-band start frequency :math:`\nu_{\rm start}` in MHz.
:rtype:
:class:`~astropy.units.Quantity`
:Example:
.. code-block:: python
from nenupy.instru import sb2freq
sb2freq(subband=1)
sb2freq(subband=[1, 2, 3, 4])
"""
if np.isscalar(subband):
subband = np.array([subband])
else:
subband = np.array(subband)
if subband.dtype.name not in ['int32', 'int64']:
raise TypeError(
"`subband` - Integer(s) expected."
)
if (subband.min() < 0) or (subband.max() > 511):
raise ValueError(
"`subband` - Values should be between 0 and 511."
)
sb_width = 100.*u.MHz/512 #200e6*192/1024# + 195312.5/2
freq_start = subband*sb_width - sb_width/2
return freq_start
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ------------------ instrument_temperature ------------------- #
# ============================================================= #
[docs]
def instrument_temperature(frequency: u.Quantity = 50*u.MHz, lna_filter: int = 0) -> u.Quantity:
""" Instrument temperature at a given ``frequency``.
This depends on the `Low Noise Amplifier <https://nenufar.obs-nancay.fr/en/astronomer/#antennas>`_
characteristics.
:param frequency:
Frequency at which computing the instrument temperature.
Default is ``50 MHz``.
:type frequency:
:class:`~astropy.units.Quantity`
:param lna_filter:
Local Noise Amplifier high-pass filter selection.
Available values are ``0, 1, 2, 3``.
They correspond to minimal frequencies ``10, 15, 20, 25 MHz`` respectively.
Default is ``0``, i.e., 10 MHz filter.
:type lna_filter:
`int`
:returns:
Instrument temperature in Kelvins
:rtype:
:class:`~astropy.units.Quantity`
.. warning::
For the time being, only ``lna_filter`` values ``0`` and ``3`` are available.
:Example:
.. code-block:: python
from nenupy.instru import instrument_temperature
import astropy.units as u
instrument_temperature(frequency=70*u.MHz)
.. seealso::
:func:`~nenupy.astro.astro_tools.sky_temperature`
"""
# Available filters
filters = {0: "no_filter", 3: "25mhz_filter"}
# lna_gain represents T_ins/T_sky - measured
lna = lna_gain[filters[lna_filter]]
# Multiply lna_gain (interpolatd at the desired frequencies) by T_sky to get T_ins
t_sky = sky_temperature(frequency=frequency)
t_inst = t_sky * np.interp(frequency, lna["frequency"]*u.MHz, lna["gain"])
return t_inst
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ------------------ miniarrays_rotated_like ------------------ #
# ============================================================= #
[docs]
def miniarrays_rotated_like(rotations: List[int] = [0]) -> np.ndarray:
r""" Returns the Mini-Array indices whose rotations match the ``rotations`` argument.
A :math:`60^{\circ}` modulo is automatically applied to all rotation parameters.
:param rotations:
Mini-Array rotation(s) to select.
A ``ValueError`` is raised if the values are not integers and/or if they are not multiples of 10.
:type rotations:
`list`[`int`]
:returns:
Mini-Array indices.
:rtype:
:class:`~numpy.ndarray`
:Example:
.. code-block:: python
from nenupy.instru import miniarrays_rotated_like
miniarrays_rotated_like([10])
"""
# Check that the rotation format is correct
if not all([rot%10 == 0 for rot in rotations]):
raise ValueError(
f"Syntax error: miniarray_rotations={rotations}. It should be a list of integers, multiples of 10."
)
ma_rotations = np.array([nenufar_miniarrays[ma]["rotation"] for ma in nenufar_miniarrays])%60
ma_indices = np.array([nenufar_miniarrays[ma]["id"] for ma in nenufar_miniarrays])
return ma_indices[np.isin(ma_rotations, np.array(rotations)%60)]
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ---------------------- read_cal_table ----------------------- #
# ============================================================= #
[docs]
def read_cal_table(calibration_file: str = None, n_mini_arrays: int = 96) -> np.ndarray:
""" Reads NenuFAR antenna delays calibration file.
:param calibration_file:
Name of the calibration file to read. If ``None`` or
``'default'`` the standard calibration file is read.
:type calibration_file: `str`
:param n_mini_arrays:
Number of Mini-Arrays recorded in the table
:type n_mini_arrays: `int`
:returns:
Antenna delays shaped as
(frequency, mini-arrays, polarizations).
:rtype: :class:`~numpy.ndarray`
"""
if (calibration_file is None) or (calibration_file.lower() == "default"):
calibration_file = join(
dirname(__file__),
"cal_pz_2_multi_2019-02-23.dat",
)
if n_mini_arrays != 96:
log.warning("The residual delay calibration table should contain 96 MAs. Maybe there's a hidden mistake.")
with open(calibration_file, "rb") as f:
log.info(
"Loading calibration table {}".format(
calibration_file
)
)
header = []
while True:
line = f.readline()
header.append(line)
if line.startswith(b"HeaderStop"):
hd_size = sum([len(s) for s in header])
break
elif not line:
# No header?
hd_size = 0
break
dtype = np.dtype(
[
("data", "float64", (512, n_mini_arrays, 2, 2))
]
)
tmp = np.memmap(
filename=calibration_file,
dtype="int8",
mode="r",
offset=hd_size
)
try:
decoded = tmp.view(dtype)[0]["data"]
except ValueError:
log.error("Try another value of n_mini_arrays")
raise
data = decoded[..., 0] + 1.j*decoded[..., 1]
return data
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ---------------- generate_nenufar_subarrays ----------------- #
# ============================================================= #
[docs]
def generate_nenufar_subarrays(
n_subarrays: int = 2,
include_remote_mas: bool = False
):
""" Generates NenuFAR sub-arrays of Mini-Arrays. The sub-arraying
is done completely randomly.
:param n_subarrays:
Number of sub-arrays to generate from the NenuFAR
Mini-Array distribution. Default is `2`.
:type n_subarrays:
`int`
:param include_remote_mas:
Include or not the remote Mini-Arrays.
:type include_remote_mas:
`bool`
:returns:
A list of Mini-Array names for each sub-array.
:rtype:
`list` of `~numpy.ndarray`
:Example:
.. code-block:: python
from nenupy.instru import generate_nenufar_subarrays
from nenupy.instru import NenuFAR
sub_arrays = generate_nenufar_subarrays(n_subarrays=2)
sub_array_1 = NenuFAR()[sub_arrays[0]]
sub_array_2 = NenuFAR()[sub_arrays[1]]
"""
if n_subarrays < 2:
raise ValueError(
"`n_subarrays` should be at least equal to 2."
)
# Number of Mini-Arrays
n_mas = 96
if include_remote_mas:
n_mas = 102
# Mini-Arrays names
ma_names = np.array([ma_name for ma_name in nenufar_miniarrays.keys()])
ma_names = ma_names[:n_mas]
# Generate random values
rng = np.random.default_rng()
ma_values = rng.random(n_mas) # floats between [0, 1[
# Create mask by evaluating the random value per MA
edge_values = np.linspace(0, 1, n_subarrays + 1)
subarray_masks = (edge_values[:-1][:, None] <= ma_values[None, :]) & (ma_values[None, :] < edge_values[1:][:, None])
# Check that each Mini-Array is counted once
if not np.all(np.sum(subarray_masks, axis=0)==1):
raise ValueError("Something's wrong.")
# Return a list of sub-arrays (numpy arrays of Mini-Array names)
return [ma_names[subarray_masks[i]] for i in range(n_subarrays)]
# ============================================================= #
# ============================================================= #
# ============================================================= #
# --------------- lofar_instrument_temperature ---------------- #
# ============================================================= #
[docs]
def lofar_instrument_temperature(frequency: u.Quantity) -> u.Quantity:
"""_summary_
From Vlad Kondratiev lofar_tinst.py (polynomial fit on Wijnholds (2011))
Parameters
----------
frequency : u.Quantity
_description_
Returns
-------
u.Quantity
_description_
"""
lba_mask = frequency <= 90*u.MHz
hba_mask = frequency >= 110*u.MHz
t_inst_poly_fit_coeff_lba = np.array(
[
6.2699888333e-05,
-0.019932340239,
2.60625093843,
-179.560314268,
6890.14953844,
-140196.209123,
1189842.07708
]
)
t_inst_poly_fit_coeff_hba = np.array(
[
6.64031379234e-08,
-6.27815750717e-05,
0.0246844426766,
-5.16281033712,
605.474082663,
-37730.3913315,
975867.990312
]
)
t_inst_poly_fit_lba = np.poly1d(t_inst_poly_fit_coeff_lba)
t_inst_poly_fit_hba = np.poly1d(t_inst_poly_fit_coeff_hba)
instrument_temperature = np.empty(frequency.shape)
instrument_temperature[lba_mask] = t_inst_poly_fit_lba(frequency[lba_mask].to_value(u.MHz))
instrument_temperature[hba_mask] = t_inst_poly_fit_hba(frequency[hba_mask].to_value(u.MHz))
instrument_temperature[~(lba_mask + hba_mask)] = np.nan
return instrument_temperature * u.K
# ============================================================= #
# ============================================================= #
# ============================================================= #
# -------------- mini_array_analog_pointing_grid -------------- #
# ============================================================= #
[docs]
def mini_array_analog_pointing_grid(ma_rotation: u.Quantity = 0 * u.deg) -> Tuple[Longitude, Latitude]:
"""Compute the grod of analog pointing for a given Mini-Array rotation.
Example
-------
.. code-block:: python
>>> import matplotlib.pyplot as plt
>>> fig = plt.figure(figsize=(10, 10))
>>> ax = fig.add_subplot(projection="polar")
>>> ax.set_rlim(90, 0)
>>> ma_rot = 0 * u.deg
>>> grid = SkyCoord(*mini_array_analog_pointing_grid(ma_rotation=ma_rot))
>>> ax.scatter(grid.ra.rad, grid.dec.deg, 0.5)
Parameters
----------
ma_rotation : :class:`~astropy.units.Quantity`, optional
Rotation of the Minni-Array, by default `0*u.deg`
Returns
-------
Tuple[:class:`~astropy.coordinates.Longitude`, :class:`~astropy.coordinates.Latitude`]
Horizontal coordinates of the pointing grid.
"""
dx = 2 * 5.5
# DY = dx * np.cos(np.pi / 6)
dmin_x = 0.165
# DMINY = dmin_x * np.cos(np.pi / 6)
dmin_d = dmin_x / dx
n_bits = 7
bits = 2**(n_bits - 1)
bits_array = np.arange(2*bits)
xx, yy = np.meshgrid(bits_array, bits_array)
xx_mask = xx >= 64
yy_mask = yy >= 64
k1 = (xx - bits + 1) * dmin_d
k2 = (bits - 1 - yy) * dmin_d
k1[xx_mask] = (xx[xx_mask] - bits) * dmin_d
k2[yy_mask] = (bits - yy[yy_mask]) * dmin_d
# theta = 0.5*np.arccos(1 - 2*(k1**2 + k2**2))
with np.errstate(invalid="ignore"):
theta = np.pi / 2 - (0.5 * np.arccos(1 - 2 * (k1**2 + k2**2)))
bad_values = np.isnan(theta)
# phi = np.arctan2(k2, k1) + np.pi
phi = np.pi / 2 - (np.arctan2(k2, k1) + np.pi) + ma_rotation.to_value("rad")
theta[bad_values] = - np.pi / 2
phi[bad_values] = 0.
return (
Longitude(phi, unit="rad"),
Latitude(theta, unit="rad"),
)
[docs]
def mini_array_pointing_order(coordinates: SkyCoord, ma_rotation: u.Quantity = 0 * u.deg, wrong_implementation: bool = False) -> np.ndarray:
if coordinates.ndim > 1:
raise Exception("coordinates can only have a single dimension.")
elif coordinates.ndim == 0:
coordinates = coordinates.reshape((1,))
ma_pointing_grid_lon, ma_pointing_grid_lat = mini_array_analog_pointing_grid(ma_rotation=ma_rotation)
# Find out the angular separation between coordinates and the ma pointing grid
# The resulting separation shape will be (n_coordinates, 128, 128)
def separation_vincenty(lon1, lat1, lon2, lat2):
"""https://en.wikipedia.org/wiki/Great-circle_distance"""
lon_diff = np.abs(lon2 - lon1)
cos_lon_diff = np.cos(lon_diff)
sin_lon_diff = np.sin(lon_diff)
cos_lat1 = np.cos(lat1)
sin_lat1 = np.sin(lat1)
cos_lat2 = np.cos(lat2)
sin_lat2 = np.sin(lat2)
part_1 = np.sqrt( (cos_lat2 * sin_lon_diff)**2 + (cos_lat1 * sin_lat2 - sin_lat1 * cos_lat2 * cos_lon_diff)**2)
part_2 = sin_lat1 * sin_lat2 + cos_lat1 * cos_lat2 * cos_lon_diff
dist_rad = np.atan2(part_1, part_2)
return dist_rad
angular_sperarations = separation_vincenty(
lon1=coordinates.ra.rad[:, None, None],
lat1=coordinates.dec.rad[:, None, None],
lon2=ma_pointing_grid_lon.rad[None, :, :],
lat2=ma_pointing_grid_lat.rad[None, :, :]
)
sep_shape = angular_sperarations.shape
order = np.array(
np.unravel_index(
np.argmin(
angular_sperarations.reshape(
(sep_shape[0], sep_shape[1] * sep_shape[2])
),
axis=1
),
sep_shape[1:],
)
)
# Correct for order #64 which is identical to # 63
if wrong_implementation:
order[order >= 64] -= 1
else:
order[order == 64] -= 1
return order.T
[docs]
def mini_array_pointing_coordinates(orders: np.ndarray, ma_rotation: u.Quantity = 0 * u.deg) -> SkyCoord:
if orders.ndim != 2:
raise Exception("orders must be 2D")
elif orders.shape[1] != 2:
raise Exception("orders must have its 2nd dimension of size 2")
ma_pointing_grid_lon, ma_pointing_grid_lat = mini_array_analog_pointing_grid(ma_rotation=ma_rotation)
return SkyCoord(
ma_pointing_grid_lon[orders[:, 0], orders[:, 1]],
ma_pointing_grid_lat[orders[:, 0], orders[:, 1]]
)
# ============================================================= #
# -------------------- pointing_correction -------------------- #
[docs]
def pointing_correction(altaz_coordinates: SkyCoord, correction_year: str = "2022") -> SkyCoord:
"""Modify horizontal coordinates by the empirical pointing correction.
NenuFAR used to suffer from a pointing issue due to the coordinates frame upon which the beamforming delays were computed.
Although this issue disappeared on 2025 June 17th 12:00 UTC, two instances of pointing corrections were derived.
Their goal was to compensate for the angular shift that was measured.
Two sets of correction files were applied, and the year could be specified by `correction_year`.
Parameters
----------
altaz_coordinates : :class:`~astropy.coordinates.SkyCoord`
Sky coordinates in horizontal frame (i.e. :class:`~astropy.coordinates.AltAz` frame) to be corrected for.
correction_year : `str`, optional
Correction year (either "2019", "2022" or "none") if "none" or anything else, no correction is applied, by default "2022".
Returns
-------
:class:`~astropy.coordinates.SkyCoord`
The corrected coordinates in horizontal frame.
Examples
--------
.. code-block:: python
>>> from nenupy.instru.instrument_tools import pointing_correction
>>> from astropy.coordinates import SkyCoord, AltAz
>>> from astropy.time import Time
>>> from nenupy import nenufar_position
>>> import numpy as np
>>> import matplotlib.pyplot as plt
>>> azs = np.linspace(0, 360, 200)
>>> alts = np.linspace(0, 90, 200)
>>> azimuths, elevations = np.meshgrid(azs, alts)
>>> coords = SkyCoord(
>>> azimuths,
>>> elevations,
>>> unit="deg",
>>> frame=AltAz(
>>> obstime=Time("2025-06-17 12:00:00"),
>>> location=nenufar_position
>>> )
>>> )
>>> coords_corrected = pointing_correction(coords, "2022")
>>> fig = plt.figure(figsize=(7, 7))
>>> ax = fig.add_subplot(projection="polar")
>>> im = ax.pcolormesh(np.radians(azs), alts, coords_corrected.separation(coords).deg)
>>> ax.set_rlim(90, 0)
>>> ax.set_theta_zero_location("N")
>>> cb = fig.colorbar(im, fraction=0.045, pad=0.08)
>>> cb.set_label("Angular separation (deg)")
.. figure:: ../_images/instru_images/pointing_correction.png
:width: 650
:align: center
"""
# Check that the coordinates or in horizontal frame.
if not isinstance(altaz_coordinates.frame, AltAz):
raise ValueError("Input coordinates are expected to be expressed on an AltAz frame.")
# Do not take into account elevations less than 15 deg (c.f. SAV files)
low_alt_mask = altaz_coordinates.alt.deg > 15
# Correction file selection
if correction_year == "2022":
correction_file = join(
dirname(__file__),
"cor_azel_polys_2022.sav",
)
elif correction_year == "2019":
correction_file = join(
dirname(__file__),
"cor_azel_mix.sav",
)
elif correction_year.lower() == "none":
return altaz_coordinates
else:
log.warning("Available corrections are '2019' or '2022' or 'none'. Setting 'none' instead.")
return altaz_coordinates
# Read the IDL Sav file
correction_data = readsav(correction_file)
# Parse data and axes
delta_az_data = correction_data["daz_cor"]
delta_alt_data = correction_data["del_cor"]
az_vector = np.linspace(0, 360, 361)
alt_vector = np.linspace(15, 90, 76)
# Define the interpolation functions to access the whole range of coordinates
delta_az_func = RegularGridInterpolator((alt_vector, az_vector), delta_az_data)
delta_alt_func = RegularGridInterpolator((alt_vector, az_vector), delta_alt_data)
# Compute the delta azimuth and elevation
coords = np.array([altaz_coordinates[low_alt_mask].alt.deg, altaz_coordinates[low_alt_mask].az.deg]).T
delta_az = np.zeros(low_alt_mask.shape)
delta_az[low_alt_mask] = delta_az_func(coords)
delta_alt = np.zeros(low_alt_mask.shape)
delta_alt[low_alt_mask] = delta_alt_func(coords)
# Return the modified set of coordinates
new_coordinates = SkyCoord(
altaz_coordinates.az + delta_az / 60 * u.deg,
altaz_coordinates.alt + delta_alt / 60 * u.deg,
frame=altaz_coordinates.frame
)
return new_coordinates