#! /usr/bin/python3
# -*- coding: utf-8 -*-
"""
*************
NenuFAR Tools
*************
"""
__author__ = "Alan Loh"
__copyright__ = "Copyright 2021, nenupy"
__credits__ = ["Alan Loh"]
__maintainer__ = "Alan"
__email__ = "alan.loh@obspm.fr"
__status__ = "Production"
__all__ = [
"freq2sb",
"sb2freq",
"instrument_temperature",
"miniarrays_rotated_like",
"read_cal_table",
"generate_nenufar_subarrays"
]
import numpy as np
import astropy.units as u
from typing import List
from os.path import join, dirname
from scipy.interpolate import interp2d
from nenupy.instru import lna_gain
from nenupy.astro.astro_tools import sky_temperature
from nenupy.instru import nenufar_miniarrays
import logging
log = logging.getLogger(__name__)
# ============================================================= #
# ------------------------- freq2sb --------------------------- #
# ============================================================= #
[docs]
def freq2sb(frequency: u.Quantity):
r""" Conversion between the frequency :math:`\nu` and the
NenuFAR sub-band index :math:`n_{\rm SB}`.
Each NenuFAR sub-band has a bandwidth of
:math:`\Delta \nu = 195.3125\, \rm{kHz}`:
.. math::
n_{\rm SB} = \lfloor*{ \frac{\nu}{\Delta \nu} + \frac{1}{2} \rfloor
:param frequency:
Frequency to convert in sub-band index.
:type frequency:
:class:`~astropy.units.Quantity`
:returns:
Sub-band index, same shape as ``frequency``.
:rtype:
`int` or :class:`~numpy.ndarray`
:example:
.. code-block:: python
from nenupy.instru import freq2sb
import astropy.units as u
freq2sb(frequency=50.5*u.MHz)
freq2sb(frequency=[50.5, 51]*u.MHz)
"""
if not isinstance(frequency, u.Quantity):
raise TypeError(
f"`frequency` - {u.Quantity} expected."
)
if (frequency.min() < 0 * u.MHz) or (frequency.max() > 100 * u.MHz):
raise ValueError(
"'frequency' should be between 0 and 100 MHz."
)
frequency = frequency.to(u.MHz)
sb_width = 100. * u.MHz / 512
sb_idx = np.floor(frequency/sb_width + 0.5)
return sb_idx.astype(int).value
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ------------------------- freq2sb --------------------------- #
# ============================================================= #
[docs]
def sb2freq(subband):
r""" Conversion between NenuFAR sub-band index :math:`n_{\rm SB}`
to sub-band starting frequency :math:`\nu_{\rm start}`.
.. math::
\nu_{\rm start} = n_{\rm SB} \times \Delta \nu
Each NenuFAR sub-band has a bandwidth of
:math:`\Delta \nu = 195.3125\, \rm{kHz}`, therefore, the
sub-band :math:`n_{\rm SB}` goes from :math:`\nu_{\rm start}`
to :math:`\nu_{\rm stop} = \nu_{\rm start} + \Delta \nu`.
:param subband:
Sub-band index (from 0 to 511).
:type subband:
`int` or :class:`~numpy.ndarray` of `int`
:returns:
Sub-band start frequency :math:`\nu_{\rm start}` in MHz.
:rtype:
:class:`~astropy.units.Quantity`
:Example:
.. code-block:: python
from nenupy.instru import sb2freq
sb2freq(subband=1)
sb2freq(subband=[1, 2, 3, 4])
"""
if np.isscalar(subband):
subband = np.array([subband])
else:
subband = np.array(subband)
if subband.dtype.name not in ['int32', 'int64']:
raise TypeError(
"`subband` - Integer(s) expected."
)
if (subband.min() < 0) or (subband.max() > 511):
raise ValueError(
"`subband` - Values should be between 0 and 511."
)
sb_width = 100.*u.MHz/512 #200e6*192/1024# + 195312.5/2
freq_start = subband*sb_width - sb_width/2
return freq_start
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ------------------ instrument_temperature ------------------- #
# ============================================================= #
[docs]
def instrument_temperature(frequency: u.Quantity = 50*u.MHz, lna_filter: int = 0) -> u.Quantity:
""" Instrument temperature at a given ``frequency``.
This depends on the `Low Noise Amplifier <https://nenufar.obs-nancay.fr/en/astronomer/#antennas>`_
characteristics.
:param frequency:
Frequency at which computing the instrument temperature.
Default is ``50 MHz``.
:type frequency:
:class:`~astropy.units.Quantity`
:param lna_filter:
Local Noise Amplifier high-pass filter selection.
Available values are ``0, 1, 2, 3``.
They correspond to minimal frequencies ``10, 15, 20, 25 MHz`` respectively.
Default is ``0``, i.e., 10 MHz filter.
:type lna_filter:
`int`
:returns:
Instrument temperature in Kelvins
:rtype:
:class:`~astropy.units.Quantity`
.. warning::
For the time being, only ``lna_filter`` values ``0`` and ``3`` are available.
:Example:
.. code-block:: python
from nenupy.instru import instrument_temperature
import astropy.units as u
instrument_temperature(frequency=70*u.MHz)
.. seealso::
:func:`~nenupy.astro.astro_tools.sky_temperature`
"""
# Available filters
filters = {0: "no_filter", 3: "25mhz_filter"}
# lna_gain represents T_ins/T_sky - measured
lna = lna_gain[filters[lna_filter]]
# Multiply lna_gain (interpolatd at the desired frequencies) by T_sky to get T_ins
t_sky = sky_temperature(frequency=frequency)
t_inst = t_sky * np.interp(frequency, lna["frequency"]*u.MHz, lna["gain"])
return t_inst
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ------------------ miniarrays_rotated_like ------------------ #
# ============================================================= #
[docs]
def miniarrays_rotated_like(rotations: List[int] = [0]) -> np.ndarray:
r""" Returns the Mini-Array indices whose rotations match the ``rotations`` argument.
A :math:`60^{\circ}` modulo is automatically applied to all rotation parameters.
:param rotations:
Mini-Array rotation(s) to select.
A ``ValueError`` is raised if the values are not integers and/or if they are not multiples of 10.
:type rotations:
`list`[`int`]
:returns:
Mini-Array indices.
:rtype:
:class:`~numpy.ndarray`
:Example:
.. code-block:: python
from nenupy.instru import miniarrays_rotated_like
miniarrays_rotated_like([10])
"""
# Check that the rotation format is correct
if not all([rot%10 == 0 for rot in rotations]):
raise ValueError(
f"Syntax error: miniarray_rotations={rotations}. It should be a list of integers, multiples of 10."
)
ma_rotations = np.array([nenufar_miniarrays[ma]["rotation"] for ma in nenufar_miniarrays])%60
ma_indices = np.array([nenufar_miniarrays[ma]["id"] for ma in nenufar_miniarrays])
return ma_indices[np.isin(ma_rotations, np.array(rotations)%60)]
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ---------------------- read_cal_table ----------------------- #
# ============================================================= #
[docs]
def read_cal_table(calibration_file: str = None) -> np.ndarray:
""" Reads NenuFAR antenna delays calibration file.
:param calibration_file:
Name of the calibration file to read. If ``None`` or
``'default'`` the standard calibration file is read.
:type calibration_file: `str`
:returns:
Antenna delays shaped as
(frequency, mini-arrays, polarizations).
:rtype: :class:`~numpy.ndarray`
"""
if (calibration_file is None) or (calibration_file.lower() == "default"):
calibration_file = join(
dirname(__file__),
'cal_pz_2_multi_2019-02-23.dat',
)
with open(calibration_file, 'rb') as f:
log.info(
"Loading calibration table {}".format(
calibration_file
)
)
header = []
while True:
line = f.readline()
header.append(line)
if line.startswith(b"HeaderStop"):
break
hd_size = sum([len(s) for s in header])
dtype = np.dtype(
[
('data', 'float64', (512, 96, 2, 2))
]
)
tmp = np.memmap(
filename=calibration_file,
dtype="int8",
mode="r",
offset=hd_size
)
decoded = tmp.view(dtype)[0]["data"]
data = decoded[..., 0] + 1.j*decoded[..., 1]
return data
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ---------------- generate_nenufar_subarrays ----------------- #
# ============================================================= #
[docs]
def generate_nenufar_subarrays(
n_subarrays: int = 2,
include_remote_mas: bool = False
):
""" Generates NenuFAR sub-arrays of Mini-Arrays. The sub-arraying
is done completely randomly.
:param n_subarrays:
Number of sub-arrays to generate from the NenuFAR
Mini-Array distribution. Default is `2`.
:type n_subarrays:
`int`
:param include_remote_mas:
Include or not the remote Mini-Arrays.
:type include_remote_mas:
`bool`
:returns:
A list of Mini-Array names for each sub-array.
:rtype:
`list` of `~numpy.ndarray`
:Example:
.. code-block:: python
from nenupy.instru import generate_nenufar_subarrays
from nenupy.instru import NenuFAR
sub_arrays = generate_nenufar_subarrays(n_subarrays=2)
sub_array_1 = NenuFAR()[sub_arrays[0]]
sub_array_2 = NenuFAR()[sub_arrays[1]]
"""
if n_subarrays < 2:
raise ValueError(
"`n_subarrays` should be at least equal to 2."
)
# Number of Mini-Arrays
n_mas = 96
if include_remote_mas:
n_mas = 102
# Mini-Arrays names
ma_names = np.array([ma_name for ma_name in nenufar_miniarrays.keys()])
ma_names = ma_names[:n_mas]
# Generate random values
rng = np.random.default_rng()
ma_values = rng.random(n_mas) # floats between [0, 1[
# Create mask by evaluating the random value per MA
edge_values = np.linspace(0, 1, n_subarrays + 1)
subarray_masks = (edge_values[:-1][:, None] <= ma_values[None, :]) & (ma_values[None, :] < edge_values[1:][:, None])
# Check that each Mini-Array is counted once
if not np.all(np.sum(subarray_masks, axis=0)==1):
raise ValueError("Something's wrong.")
# Return a list of sub-arrays (numpy arrays of Mini-Array names)
return [ma_names[subarray_masks[i]] for i in range(n_subarrays)]
# ============================================================= #
# ============================================================= #