#! /usr/bin/python3
# -*- coding: utf-8 -*-
"""
*********************
NenuFAR Array Classes
*********************
.. inheritance-diagram:: nenupy.instru.nenufar.MiniArray nenupy.instru.nenufar.NenuFAR
:parts: 3
.. autosummary::
~MiniArray
~NenuFAR
"""
__author__ = "Alan Loh"
__copyright__ = "Copyright 2021, nenupy"
__credits__ = ["Alan Loh"]
__maintainer__ = "Alan"
__email__ = "alan.loh@obspm.fr"
__status__ = "Production"
__all__ = [
"NenuFAR_Configuration",
"Polarization",
"MiniArray",
"NenuFAR"
]
from functools import lru_cache
import logging
log = logging.getLogger(__name__)
import numpy as np
import astropy.units as u
from astropy.time import Time, TimeDelta
from astropy.coordinates import EarthLocation, SkyCoord, AltAz
from pyproj import Transformer
from nenupy import nenufar_position
from nenupy.instru import (
nenufar_miniarrays,
miniarray_antennas,
squint_table,
instrument_temperature
)
from nenupy.instru.interferometer import Interferometer
from nenupy.astro.astro_tools import radec_to_altaz
from nenupy.astro.sky import Sky
from nenupy.astro.pointing import Pointing
# ============================================================= #
# ---------------- Polarization / Antenna Gain ---------------- #
# ============================================================= #
import healpy as hp
#from scipy.interpolate import interp2d
from scipy.interpolate import BarycentricInterpolator#RegularGridInterpolator
import os
from enum import Enum
class _AntennaGain:
""" NenuFAR antenna gain class. """
def __init__(self, polarization: str = 'NW'):
self.polarization = polarization
if self.polarization == 'NE':#'NW':
fields = np.arange(8)
elif self.polarization == 'NW':#'NE':
fields = 8 + np.arange(8)
else:
raise Exception(f"Polarization '{self.polarization}' unknown.")
# Read the gain
gain = hp.read_map(
filename=os.path.join(os.path.dirname(__file__), './NenuFAR_Ant_Hpx.fits'),
hdu=1,
field=fields,
memmap=True,
dtype=float
)
gain /= gain.max()
# Interpolate the antenna gain on the frequency axis
freqs = np.arange(10, 90, 10)
self.healpix_coords = np.arange(hp.pixelfunc.nside2npix(64))
# self.interpolated_gain = interp2d(
# x=self.healpix_coords,
# y=freqs,
# z=gain,
# kind='linear'
# )
# self.interpolated_gain = RegularGridInterpolator(
# points=(freqs, self.healpix_coords),
# values=gain,
# method="linear"
# )
self.interpolated_gain = BarycentricInterpolator(
xi=freqs,
yi=gain,
axis=0
)
log.info(f'NenuFAR antenna model (polarization={polarization}) loaded.')
@lru_cache(maxsize=1)
def __getitem__(self, sky: Sky) -> np.ndarray:
""" Return an antenna gain array shaped like (sky.time, sky.frequency, sky.coord)
"""
horizontal_coordinates = sky.horizontal_coordinates
log.debug(
f"Interpolating NenuFAR antenna response ('{self.polarization}' polarization) "
f"on the given sky (time: {sky.time.size}, freq: {sky.frequency.size}, coord: {horizontal_coordinates.size})."
)
# Get the frequency from the Sky instance
freqs = sky.frequency.to_value(u.MHz)
# Find the interpolated gain at the desired frequency
#gain = self.interpolated_gain(self.healpix_coords, freqs)
#gain = self.interpolated_gain((freqs, self.healpix_coords))
gain = self.interpolated_gain(freqs) # shape: (freq, pix_coord)
# if gain.ndim == 1:
# gain = gain.reshape((1, gain.size))
# Find the interpolated gain at the desired coordinates for each frequency
gain = np.array([
hp.pixelfunc.get_interp_val(
m=gain_i,
theta=horizontal_coordinates.az.deg,
phi=horizontal_coordinates.alt.deg,
nest=False,
lonlat=True
) for gain_i in gain
])
if gain.ndim == 1:
# If only one or less dimension is larger than 1 element, get_interp_val returns a 1D array
# It's then esay to just reshape like the original array (minus the pol) since a single dimension is affected at best
original_shape = sky.value.shape
gain = gain.reshape((original_shape[0], original_shape[1], original_shape[3]))
elif gain.ndim == 2:
# The time dimension is not yet included
gain = gain.reshape((1,) + gain.shape)
# Return something shaped as (time, freq, coord)
if sky.time.size == 1:
return gain
return np.moveaxis(gain, 0, 1) # keep that!
#return gain.reshape((1,) + gain.shape) # use with RegularGridInterpolator
[docs]
class Polarization(Enum):
""" Enumerator of the different available polarizations of NenuFAR. """
NW = _AntennaGain('NW')
NE = _AntennaGain('NE')
[docs]
class NenuFAR_Configuration:
""" """
def __init__(self,
beamsquint_correction: bool = True,
beamsquint_frequency: u.Quantity = 50*u.MHz
):
self.beamsquint_correction = beamsquint_correction
self.beamsquint_frequency = beamsquint_frequency
@property
def beamsquint_frequency(self):
""" """
return self._beamsquint_frequency
@beamsquint_frequency.setter
def beamsquint_frequency(self, freq):
if not isinstance(freq, u.Quantity):
raise TypeError(
"'beamsquint_frequency' should be of type 'astropy.units.Quantity'."
)
if not freq.isscalar:
raise ValueError(
"'beamsquint_frequency' should be scalar."
)
self._beamsquint_frequency = freq
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ------------------ MiniArray class errors ------------------- #
# ============================================================= #
class MiniArrayUnknownIndex(Exception):
""" Error raised when the index doesn't exist. """
def __init__(self,
input_index: int
):
self.input_index = input_index
self.error_message = f"Mini-Array index {self.input_index} does not exist..."
available_mini_arrays = [entry["id"] for entry in nenufar_miniarrays.values()]
self.help = f"Valid Mini-Array indices are: {available_mini_arrays}."
super().__init__(self.help)
def __str__(self):
return f"{self.error_message}\n{self.help}"
class MiniArrayBadIndexFormat(Exception):
""" Error raised when the index cannot be read. """
def __init__(self,
input_index
):
self.input_index = input_index
self.error_message = f"No support for {type(self.input_index)} format as a Mini-Array index..."
self.help = f"The index should be an integer value."
super().__init__(self.help)
def __str__(self):
return f"{self.error_message}\n{self.help}"
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ------------------------- MiniArray ------------------------- #
# ============================================================= #
[docs]
class MiniArray(Interferometer):
""" Main class to handle a NenuFAR Mini-Array antenna distribution.
.. versionadded:: 2.0.0
:param index:
Mini-Array index. 'Core' Mini-Arrays have indices ranging
from ``0`` to ``95``. 'Remote' Mini-Arrays have indices
ranging from ``100`` to ``105``.
:type index:
`int`
:Example:
Instantiating :class:`~nenupy.instru.nenufar.MiniArray`:
>>> from nenupy.instru import MiniArray
>>> ma = MiniArray(index=0)
Sub-arraying on an existing :class:`~nenupy.instru.nenufar.MiniArray` instance:
>>> sub_ma = ma["Ant01", "Ant06", "Ant11"]
>>> sub_ma.antenna_names
array(['Ant01', 'Ant06', 'Ant11'], dtype='<U5')
Using `slice` object (converted in :class:`~numpy.ndarray` using `~numpy.r_`):
>>> import numpy as np
>>> sub_ma = ma[np.r_[2:10]]
>>> sub_ma.size
8
Combining two :class:`~nenupy.instru.nenufar.MiniArray` instances:
>>> ma1 = MiniArray(index=0)["Ant01", "Ant06"]
>>> ma2 = MiniArray(index=0)["Ant08", "Ant12"]
>>> combined_ma = ma1 + ma2
>>> combined_ma.antenna_names
array(['Ant01', 'Ant06', 'Ant08', 'Ant12'], dtype='<U5')
.. seealso::
More details on this class usage can be found in
:ref:`array_configuration_doc` and :ref:`instrument_properties_doc`.
.. rubric:: Attributes Summary
.. autosummary::
~MiniArray.index
~MiniArray.rotation
~nenupy.instru.interferometer.Interferometer.position
~nenupy.instru.interferometer.Interferometer.antenna_names
~nenupy.instru.interferometer.Interferometer.antenna_positions
~nenupy.instru.interferometer.Interferometer.antenna_gains
~nenupy.instru.interferometer.Interferometer.baselines
~nenupy.instru.interferometer.Interferometer.size
~nenupy.instru.interferometer.Interferometer.antenna_weights
~nenupy.instru.interferometer.Interferometer.antenna_delays
.. rubric:: Methods Summary
.. autosummary::
~MiniArray.beam
~MiniArray.effective_area
~MiniArray.instrument_temperature
~MiniArray.attenuation_from_zenith
~MiniArray.analog_pointing
~MiniArray.beamsquint_correction
~nenupy.instru.interferometer.Interferometer.plot
~nenupy.instru.interferometer.Interferometer.array_factor
~nenupy.instru.interferometer.Interferometer.system_temperature
~nenupy.instru.interferometer.Interferometer.sefd
~nenupy.instru.interferometer.Interferometer.sensitivity
~nenupy.instru.interferometer.Interferometer.angular_resolution
~nenupy.instru.interferometer.Interferometer.confusion_noise
.. rubric:: Attributes and Methods Documentation
"""
def __init__(self,
index: int = 0,
antenna_delays: np.ndarray = None,
antenna_weights: np.ndarray = None
):
self.index = index
try:
ma_name = f'MA{self.index:03d}'
position = EarthLocation(
lat=nenufar_miniarrays[ma_name]['lat'] * u.deg,
lon=nenufar_miniarrays[ma_name]['lon'] * u.deg,
height=nenufar_miniarrays[ma_name]['height'] * u.m
)
except KeyError:
raise MiniArrayUnknownIndex(self.index)
except:
raise MiniArrayBadIndexFormat(self.index)
antenna_names = np.array([ant for ant in miniarray_antennas.keys()])
antPos = np.array([ant['position'] for ant in miniarray_antennas.values()])
self.rotation = nenufar_miniarrays[ma_name]['rotation'] * u.deg
#rotation = np.radians(self.rotation.value + 180)
rotation = np.radians(360-self.rotation.value)
#rotation = np.radians(self.rotation.value)
rotMatrix = np.array(
[
[np.cos(rotation), -np.sin(rotation), 0],
[-np.sin(rotation), -np.cos(rotation), 0],
[0, 0, 1]
]
)
antenna_positions = np.dot(antPos, rotMatrix).astype(np.float32)
antenna_gains = np.array([
self._antenna_gain for _ in range(antenna_names.size)
])
super().__init__(position=position,
antenna_names=antenna_names,
antenna_positions=antenna_positions,
antenna_gains=antenna_gains,
antenna_delays=antenna_delays,
antenna_weights=antenna_weights
)
def __repr__(self):
return f"{self.__class__}(index={self.index})"
def __str__(self):
return f"{self.__class__.__name__}(index={self.index}, antennas={self.antenna_names})"
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def index(self) -> int:
""" Mini-Array index.
'Core' Mini-Arrays have indices ranging
from ``0`` to ``95``. 'Remote' Mini-Arrays have indices
ranging from ``100`` to ``105``.
:setter: Mini-Array index.
:getter: Mini-Array index.
:type: `int`
"""
return self._index
@index.setter
def index(self, i: int):
self._index = i
@property
def rotation(self) -> u.Quantity:
""" Mini-Array rotation.
Each NenuFAR Mini-Array has its own rotation with
respect to the others by angles multiple of 10 deg.
:setter: Mini-Array rotation.
:getter: Mini-Array rotation.
:type: :class:`~astropy.units.Quantity`
"""
return self._rotation
@rotation.setter
def rotation(self, r: u.Quantity):
self._rotation = r
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def beam(self,
sky: Sky,
pointing: Pointing,
configuration: NenuFAR_Configuration = NenuFAR_Configuration(),
return_complex: bool = False,
normalize: bool = True
) -> Sky:
r""" Computes the Mini-Array beam over the ``sky`` for a given
``pointing``.
.. math::
\mathcal{G}_{\rm MA}(\nu, \phi, \theta) = \mathcal{F}_{\rm MA}(\nu, \phi, \theta) \mathcal{G}_{\rm ant} (\nu, \phi, \theta)
where :math:`\nu` is the frequency, :math:`\phi` is the azimuth,
:math:`\theta` is the elevation,
:math:`\mathcal{G}_{\rm ant}` is the NenuFAR dipole antenna radiation pattern and
:math:`\mathcal{F}_{\rm MA}` is the array factor.
This method considers the ``sky`` as the desired output (in terms of
time, frequency, polarization and sky positions). It evaluates the effective
pointing directions for every time step defined in ``sky`` regarding
the ``pointing`` input.
:param sky:
Desired output contained in a :class:`~nenupy.astro.sky.Sky` instance.
(:attr:`~nenupy.astro.sky.Sky.time`, :attr:`~nenupy.astro.sky.Sky.frequency`,
:attr:`~nenupy.astro.sky.Sky.polarization` and
:attr:`~nenupy.astro.sky.Sky.coordinates` are used as inputs for the computation).
:type sky:
:class:`~nenupy.astro.sky.Sky`
:param pointing:
Instance of :class:`~nenupy.astro.pointing.Pointing` that defines
the targeted pointing directions over the time.
:type pointing:
:class:`~nenupy.astro.pointing.Pointing`
:param configuration:
NenuFAR configuration to consider during the beam simulation.
The beamsquint correction and its frequency setting are defined here.
Default is ``NenuFAR_Configuration(beamsquint_correction=True, beamsquint_frequency=50MHz)``.
:type configuration:
:class:`~nenupy.instru.nenufar.NenuFAR_Configuration`
:returns:
The instance of :class:`~nenupy.astro.sky.Sky`
given as input is returned, its attribute
:attr:`~nenupy.astro.sky.Sky.value` is updated
with the result of the beam computation (stored as
an :class:`~dask.array.Array`) and shaped as
``(time, frequency, polarization, coordinates)``.
:rtype:
:class:`~nenupy.astro.sky.Sky`
:Example:
Load the required librairies:
>>> from nenupy.instru import MiniArray, Polarization
>>> from nenupy.astro.sky import HpxSky
>>> from nenupy.astro.pointing import Pointing
>>> import astropy.units as u
>>> from astropy.time import Time, TimeDelta
Define a desired :class:`~nenupy.astro.sky.Sky` output:
>>> sky = HpxSky(
resolution=1.*u.deg,
frequency=np.array([25, 50, 75])*u.MHz,
polarization=np.array([Polarization.NW, Polarization.NE]),
time=Time("2021-10-15 20:00:00")
)
Define the pointing of the Mini-Array:
>>> ma_pointing = Pointing.zenith_tracking(
time=Time("2021-10-15 00:00:00"),
duration=TimeDelta(3600*24, format="sec")
)
Select the Mini-Array (and possibly its antenna distribution) and compute its response pattern:
>>> ma = MiniArray(1)
>>> beam = ma.beam(
sky=sky,
pointing=ma_pointing
)
Calling :meth:`print` on a :class:`~nenupy.astro.sky.Sky` object
enables the display of its :attr:`~nenupy.astro.sky.Sky.value` attribute structure
(which matches the definition of the ``sky`` instance):
>>> print(beam)
<class 'nenupy.astro.sky.HpxSky'> instance
value: (1, 3, 2, 49152)
* time: (1,)
* frequency: (3,)
* polarization: (2,)
* coordinates: (49152,)
To :meth:`~nenupy.astro.sky.SkySliceBase.plot` the computed Mini-Array response at 75 MHz, in NE polarization:
>>> beam[0, 2, 1].plot(
decibel=True,
colorbar_label=''
)
.. image:: ./_images/instru_images/ma1_beam.png
:width: 800
.. seealso::
:meth:`~nenupy.instru.interferometer.Interferometer.array_factor` and :ref:`beam_simulation_doc`
"""
log.info(
f"Computing <class 'MiniArray'> beam ({self.size} "
f"antennas, {sky.time.size} time and "
f"{sky.frequency.size} frequency slots)."
)
# Computing the Mini-Array effective area.
# aeff = self.effective_area(sky.frequency).to(u.m**2).value
# The beam is computed thanks to the Interferometer super method.
# The returned value is only divided by Aeff.
return super().beam(
sky=sky,
pointing=self.analog_pointing(pointing, configuration=configuration),
return_complex=return_complex,
normalize=normalize
)# / aeff[None, :, None, None]
[docs]
def effective_area(self,
frequency: u.Quantity = 50*u.MHz,
elevation: u.Quantity = 90*u.deg
) -> u.Quantity:
r""" Computes the effective area of a NenuFAR Mini-Array.
The effective area of a Mini-Array (:math:`\mathcal{A}_{\rm eff,\ MA}`) is
computed as the sum of dipole effective areas (:math:`\mathcal{A}_{\rm eff, ant}`),
while taking into account overlaps.
This is a function of ``frequency`` (:math:`\nu`) and ``elevation``
(:math:`\theta`):
.. math::
\mathcal{A}_{\rm eff,\ MA} (\nu) = \sum_{\rm ant} \mathcal{A}_{\rm eff, ant} (\nu) \sin( \theta )
with
.. math::
\mathcal{A}_{\rm eff, ant} (\nu) = \frac{\lambda^2}{3}
the NenuFAR dipole antenna effective area.
:param frequency:
Frequency at which the effective area is computed.
Default is 50 MHz.
:type frequency:
:class:`~astropy.units.Quantity`
:param elevation:
Elevation at which the effective area is computed.
Default is 90 deg, i.e., as seen from the zenith.
:type elevation:
:class:`~astropy.units.Quantity`
:returns:
Effective area of a Mini-Array shaped as ``frequency``.
:rtype:
:class:`~astropy.units.Quantity`
:Example:
>>> from nenupy.instru import MiniArray
>>> import astropy.units as u
>>> ma = MiniArray()
>>> ma.effective_area(50*u.MHz)
227.68377 m2
>>> ma = MiniArray()
>>> ma.effective_area(frequency=50*u.MHz, elevation=45*u.deg)
160.99673 m2
>>> ma = MiniArray()["Ant01"]
>>> ma.effective_area(50*u.MHz)
11.979179 m2
>>> ma = MiniArray()
>>> ma.effective_area(u.Quantity([20, 30, 40], unit='MHz'))
[693.44216, 532.97815, 355.85306] m2
.. seealso::
:ref:`effective_area_sec`
"""
log.debug(
f"Mini-Array effective area, using {self.size} Antennas."
)
# Antenna Effective Area, formula for a dipole antenna.
k = 3
wavelength = frequency.to(
u.m,
equivalencies=u.spectral()
)
antenna_effective_area = wavelength**2 / k
radius_ant_eff_area = np.sqrt(antenna_effective_area/np.pi)
max_radius = np.max(radius_ant_eff_area)
n = 500 # grid resolution
ant_pos = self.antenna_positions * u.m
x_grid = np.linspace(
ant_pos[:, 0].min() - max_radius,
ant_pos[:, 0].max() + max_radius,
n
)
dx = x_grid[1] - x_grid[0]
y_grid = np.linspace(
ant_pos[:, 1].min() - max_radius,
ant_pos[:, 1].max() + max_radius,
n
)
dy = y_grid[1] - y_grid[0]
xx_grid, yy_grid = np.meshgrid(x_grid, y_grid)
dist = np.linalg.norm(
ant_pos[:, :2][..., None, None] -\
np.array([xx_grid, yy_grid]) * u.m,
axis=1
)
return np.sum(
np.any(
(dist <= radius_ant_eff_area) if radius_ant_eff_area.isscalar else\
(dist[..., None] <= radius_ant_eff_area),
axis=0
),
axis=(0, 1)
) * dx * dy * np.sin(elevation.to(u.rad).value)
[docs]
def attenuation_from_zenith(self,
coordinates,
time: Time = Time.now(),
frequency: u.Quantity = 50*u.MHz,
polarization: Polarization = Polarization.NW
):
""" Returns the attenuation factor evaluated at given ``coordinates``
compared to the zenithal Mini-Array beam gain.
:param coordinates:
Sky positions equatorial coordinates.
:type coordinates:
:class:`~astropy.coordinates.SkyCoord`
:param time:
UTC time at which the attenuation is evaluated. Default is ``now``.
:type time:
:class:`~astropy.time.Time`
:param frequency:
Frequency at which the attenuation is evaluated. Default is ``50 MHz``.
:type frquency:
:class:`~astropy.units.Quantity`
:param polarization:
NenuFAR antenna polarization. Default is ``Polarization.NW``.
:type polarization:
:class:`~nenupy.instru.nenufar.Polarization`
:returns:
Attenuation factor shaped as ``(time, frequency, polarization, coordinates)``.
``NaN`` is returned for any ``coordinates`` that is below the horizon.
:rtype:
:class:`~numpy.ndarray`
:Example:
>>> from nenupy.instru.nenufar import MiniArray
>>> from astropy.coordinates import SkyCoord
>>> ma = MiniArray(index=0)
>>> attenuation = ma.attenuation_from_zenith(
coordinates=SkyCoord.from_name("Cyg A")
)
>>> from nenupy.instru.nenufar import MiniArray
>>> from astropy.coordinates import SkyCoord
>>> import astropy.units as u
>>> ma = MiniArray(index=0)
>>> attenuation = ma.attenuation_from_zenith(
coordinates=SkyCoord.from_name("Cyg A"),
frequency=np.linspace(20, 80, 10)*u.MHz
)
.. versionadded:: 2.0.0
"""
# Define the pointing towards the zenith
pointing = Pointing.zenith_tracking(
time=time.reshape((1,)),
duration=TimeDelta(10, format="sec")
)
# Compute the local zenith in equatorial coordinates
local_zenith = SkyCoord(180, 90,
unit="deg",
frame=AltAz(
obstime=time,
location=nenufar_position
)
).transform_to(coordinates.frame)
# Find the coordinates below the horizon and compute a mask
input_coord_altaz = radec_to_altaz(
radec=coordinates,
time=time
)
invisible_mask = input_coord_altaz.alt.deg <= 0
# Concatenate local_zenith and coordinates
if coordinates.obstime is None:
coordinates.obstime = local_zenith.obstime
if coordinates.location is None:
coordinates.location = local_zenith.location
if coordinates.isscalar:
coordinates = coordinates.reshape((1,))
coordinates = coordinates.insert(0, local_zenith)
# Prepare a Sky instance for the beam simulation
sky = Sky(
coordinates=coordinates,
frequency=frequency,
time=time,
polarization=polarization
)
# Compute the beam
beam = self.beam(sky=sky, pointing=pointing)
# Compute the attenuation factor relative to the zenith (first member)
values = beam.value.compute()
output_values = values[..., 1:]/np.expand_dims(values[..., 0], 3)
output_values[..., invisible_mask] = np.nan
return output_values
[docs]
@staticmethod
def instrument_temperature(frequency: u.Quantity = 50*u.MHz, lna_filter: int = 0) -> u.Quantity:
""" Instrument temperature at a given ``frequency``.
This depends on the `Low Noise Amplifier <https://nenufar.obs-nancay.fr/en/astronomer/#antennas>`_
characteristics.
:param frequency:
Frequency at which computing the instrument temperature.
Default is ``50 MHz``.
:type frequency:
:class:`~astropy.units.Quantity`
:param lna_filter:
Local Noise Amplifier high-pass filter selection.
Available values are ``0, 1, 2, 3``.
They correspond to minimal frequencies ``10, 15, 20, 25 MHz`` respectively.
Default is ``0``, i.e., 10 MHz filter.
:type lna_filter:
`int`
:returns:
Instrument temperature in Kelvins
:rtype:
:class:`~astropy.units.Quantity`
.. warning::
For the time being, only ``lna_filter`` values ``0`` and ``3`` are available.
:Example:
>>> from nenupy.instru import MiniArray
>>> import astropy.units as u
>>> ma = MiniArray()
>>> ma.instrument_temperature(frequency=70*u.MHz)
526.11213 K
.. seealso::
:func:`~nenupy.astro.astro_tools.sky_temperature`
"""
return instrument_temperature(frequency=frequency, lna_filter=lna_filter)
def _order_to_skycoord(self, order: tuple) -> SkyCoord:
""" """
pointing_grid = self._generate_analog_directions()
return pointing_grid[order]
def _skycoord_to_order(self, coordinates: SkyCoord) -> tuple:
""" """
if coordinates.size != 1:
raise ValueError(
"Only size 1 `coordinates` are accepted."
)
pointing_grid = self._generate_analog_directions()
separations = coordinates.separation(pointing_grid)
order = np.array(
np.unravel_index(
np.argmin(separations, axis=None),
separations.shape
)
)
order[order >= 64] -= 1
return tuple(order)
[docs]
def beamsquint_correction(self, coords: SkyCoord, frequency: u.Quantity = 50*u.MHz) -> SkyCoord:
""" Corrects for the beamsquint effect.
:Example:
>>> from astropy.coordinates import SkyCoord, AltAz
>>> from astropy.time import Time
>>> import astropy.units as u
>>> from nenupy import nenufar_position
>>> from nenupy.instru import MiniArray
>>> position = SkyCoord(
0*u.deg,
30*u.deg,
frame=AltAz(
obstime=Time("2021-01-01 12:00:00"),
location=nenufar_position
)
)
>>> ma = MiniArray()
>>> corrected_position = ma.beamsquint_correction(
coords=position,
frequency=50*u.MHz
)
>>> corrected_position.az.deg, corrected_position.alt.deg
(0., 22.91422672)
"""
freq_idx = np.argmin(
np.abs(squint_table['freq'] - frequency.to(u.MHz).value)
)
azimuths = coords.az
elevations = coords.alt
elevations = np.interp(elevations.deg, squint_table['elev_desiree'][freq_idx, :], squint_table['elev_a_pointer'])
# Squint is limited at 20 deg elevation, otherwise the
# pointing can vary drasticaly as the available pointing
# positions become sparse at low elevation.
# elevations[elevations < 20] = 20
return SkyCoord(
azimuths,
elevations * u.deg,
frame=coords.frame
)
[docs]
def analog_pointing(self, pointing: Pointing, configuration: NenuFAR_Configuration) -> Pointing:
""" Converts the desired pointing to the effective pointing
which depends on the available pointing positions defined
on a grid due to analog cable delays.
"""
# Put the horizontal coordinates in a good shape
pointing_ho_coords = pointing.horizontal_coordinates # TODO try to copy instead of using a pointer which modifies the top object
if pointing_ho_coords.isscalar:
pointing_ho_coords = pointing_ho_coords.reshape((1,))
# Correct the pointing for beamsquint effect, that is, point at a
# lower elevation than the one desired
if configuration.beamsquint_correction:
pointing_ho_coords = self.beamsquint_correction(
coords=pointing_ho_coords,
frequency=configuration.beamsquint_frequency
)
coord = SkyCoord(
pointing_ho_coords.az,
pointing_ho_coords.alt
)
orders = list(map(self._skycoord_to_order, coord))
altaz_list = list(map(self._order_to_skycoord, orders))
azimuths = [position.ra.deg for position in altaz_list] * u.deg
elevations = [position.dec.deg for position in altaz_list] * u.deg
pointing.custom_ho_coordinates = SkyCoord(
azimuths.reshape(pointing_ho_coords.shape),
elevations.reshape(pointing_ho_coords.shape),
frame=pointing_ho_coords.frame
)
return pointing
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
#def _antenna_gain(self, sky: Sky, pointing: Pointing):
# return 1.
@lru_cache(maxsize=1)
def _antenna_gain(self, sky: Sky, pointing: Pointing):
"""
"""
import dask.array as da
gain = da.ones(
(
sky.time.size,
sky.frequency.size,
sky.polarization.size,
sky.coordinates.size
),
dtype=np.float64
)
log.debug(f"Antenna gain shape: {gain.shape}.")
for i, pol in enumerate(sky.polarization):
if not isinstance(pol, Polarization):
log.warning(
f"Invalid value encountered in <attr 'Sky.polarization'>: '{pol}'. "
f"Polarization has been set to '{Polarization.NW}' by default."
)
pol = Polarization.NW
gain[:, :, i, :] = pol.value[sky]
return gain
def _toITRF(self):
"""
"""
return self.antenna_positions
def _generate_analog_directions(self) -> SkyCoord:
""" """
from astropy.coordinates import Latitude, Longitude, SkyCoord
DX = 2*5.5
DY = DX*np.cos(np.pi/6)
DMINX = 0.165
DMINY = DMINX*np.cos(np.pi/6)
DMIN_D = DMINX/DX
NBITS = 7
BITS = 2**(NBITS - 1)
bits = np.arange(2*BITS)
xx, yy = np.meshgrid(bits, bits)
xx_mask = xx >= 64
yy_mask = yy >= 64
k1 = (xx - BITS + 1)*DMIN_D
k2 = (BITS - 1 - yy)*DMIN_D
k1[xx_mask] = (xx[xx_mask] - BITS)*DMIN_D
k2[yy_mask] = (BITS - yy[yy_mask])*DMIN_D
# theta = 0.5*np.arccos(1 - 2*(k1**2 + k2**2))
with np.errstate(invalid='ignore'):
theta = np.pi/2 - ( 0.5*np.arccos(1 - 2*(k1**2 + k2**2)) )
bad_values = np.isnan(theta)
# phi = np.arctan2(k2, k1) + np.pi
phi = np.pi/2 - (np.arctan2(k2, k1) + np.pi) + self.rotation.to("rad").value
theta[bad_values] = -np.pi/2
phi[bad_values] = 0.
return SkyCoord(
Longitude(phi, unit="rad"),
Latitude(theta, unit="rad"),
).T
# ============================================================= #
# ============================================================= #
# ============================================================= #
# -------------------------- NenuFAR -------------------------- #
# ============================================================= #
[docs]
class NenuFAR(Interferometer):
""" Main class to handle a NenuFAR array.
.. versionadded:: 2.0.0
:param miniarray_antennas:
Mini-Arrays antennas selection.
Default is ``numpy.r_[:19]``, i.e., the full 19 dipole antennas.
See :class:`~nenupy.instru.nenufar.MiniArray` for different input values.
:type miniarray_antennas:
`numpy.ndarray` or `slice`
:param include_remote_mas:
Include or not the remote Mini-Arrays.
Default is ``False``, i.e., only the dense 'core' of 96 Mini-Arrays is considered.
:type include_remote_mas:
`bool`
:Example:
Instantiating :class:`~nenupy.instru.nenufar.NenuFAR`:
>>> from nenupy.instru import NenuFAR
>>> nenufar = NenuFAR()
Sub-arraying on an existing :class:`~nenupy.instru.nenufar.NenuFAR` instance:
>>> sub_nenufar = NenuFAR()["MA001", "MA002", "MA104"]
>>> sub_nenufar.antenna_names
array(['MA001', 'MA002'], dtype='<U5')
If :attr:`~nenupy.instru.nenufar.NenuFAR.include_remote_mas` is ``True``,
the remote Mini-Arrays are included in the array and selecting ``MA104``
as above would take this remote Mini-Array into account:
>>> sub_nenufar = NenuFAR(include_remote_mas=True)["MA001", "MA002", "MA104"]
>>> sub_nenufar.antenna_names
array(['MA001', 'MA002', 'MA104'], dtype='<U5')
Combining two :class:`~nenupy.instru.nenufar.NenuFAR` instances:
>>> nenufar1 = NenuFAR()["MA001", "MA006"]
>>> nenufar2 = NenuFAR()["MA010", "MA056"]
>>> resulting_array = nenufar1 + nenufar2
>>> resulting_array.antenna_names
array(['MA001', 'MA006', 'MA010', 'MA056'], dtype='<U5')
.. note::
The result of the addition operation, namely ``resulting_array`` in this example
will conserve the properties of the first member, namely ``nenufar1``.
This is particularly true for the attributes :attr:`~nenupy.instru.nenufar.NenuFAR.include_remote_mas`
and :attr:`~nenupy.instru.nenufar.NenuFAR.miniarray_antennas`.
.. seealso::
More details on this class usage can be found in
:ref:`array_configuration_doc` and :ref:`instrument_properties_doc`.
.. rubric:: Attributes Summary
.. autosummary::
~NenuFAR.miniarray_antennas
~NenuFAR.include_remote_mas
~NenuFAR.miniarray_rotations
~nenupy.instru.interferometer.Interferometer.position
~nenupy.instru.interferometer.Interferometer.antenna_names
~nenupy.instru.interferometer.Interferometer.antenna_positions
~nenupy.instru.interferometer.Interferometer.antenna_gains
~nenupy.instru.interferometer.Interferometer.baselines
~nenupy.instru.interferometer.Interferometer.size
.. rubric:: Methods Summary
.. autosummary::
~NenuFAR.beam
~NenuFAR.effective_area
~NenuFAR.attenuation_from_zenith
~NenuFAR.instrument_temperature
~nenupy.instru.interferometer.Interferometer.plot
~nenupy.instru.interferometer.Interferometer.array_factor
~nenupy.instru.interferometer.Interferometer.system_temperature
~nenupy.instru.interferometer.Interferometer.sefd
~nenupy.instru.interferometer.Interferometer.sensitivity
~nenupy.instru.interferometer.Interferometer.angular_resolution
~nenupy.instru.interferometer.Interferometer.confusion_noise
.. rubric:: Attributes and Methods Documentation
"""
def __init__(self, miniarray_antennas: np.ndarray = np.r_[:19], include_remote_mas: bool = False):
self.miniarray_antennas = miniarray_antennas
self.include_remote_mas = include_remote_mas
antenna_names = np.array([ma for ma in nenufar_miniarrays.keys()])
antenna_positions = np.array(
[ma['position'] for ma in nenufar_miniarrays.values()],
dtype=np.float32
)
antenna_gains = np.array([
MiniArray(
index=ma['id']
)[self.miniarray_antennas].beam for ma in nenufar_miniarrays.values()
])
if not self.include_remote_mas:
# Exclude the distant Mini-Arrays from the element list
mask_distant = ~np.array([name.startswith('MA1') for name in antenna_names])
antenna_names = antenna_names[mask_distant]
antenna_positions = antenna_positions[mask_distant, :]
antenna_gains = antenna_gains[mask_distant]
super().__init__(
position=nenufar_position,
antenna_names=antenna_names,
antenna_positions=antenna_positions,
antenna_gains=antenna_gains
)
def __repr__(self):
"""
"""
return f"{self.__class__}(nMAS={self.size})"
def __str__(self):
"""
"""
return f"{self.__class__.__name__}"
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def miniarray_rotations(self) -> u.Quantity:
"""
"""
return np.array([
nenufar_miniarrays[ma]['rotation'] for ma in self.antenna_names
])*u.deg
@property
def miniarray_antennas(self):
""" List of Mini-Array antennas. """
return self._miniarray_antennas
@miniarray_antennas.setter
def miniarray_antennas(self, antennas):
self._miniarray_antennas = antennas
@property
def include_remote_mas(self) -> bool:
""" """
return self._include_remote_mas
@include_remote_mas.setter
def include_remote_mas(self, include):
if not isinstance(include, bool):
raise TypeError(
"`include_remote_mas` - Boolean value expected."
)
self._include_remote_mas = include
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def beam(self,
sky: Sky,
pointing: Pointing,
analog_pointing: Pointing = None,
configuration: NenuFAR_Configuration = NenuFAR_Configuration(),
return_complex: bool = False,
normalize: bool = True
) -> Sky:
r""" Computes the NenuFAR beam over the ``sky`` for a given
``pointing``.
.. math::
\mathcal{G}_{\rm NenuFAR}(\nu, \phi, \theta) = \mathcal{F}_{\rm NenuFAR} (\nu, \phi, \theta) \sum_{\rm MA} \mathcal{G}_{\rm MA}(\nu, \phi, \theta)
where :math:`\nu` is the frequency, :math:`\phi` is the azimuth,
:math:`\theta` is the elevation,
:math:`\mathcal{G}_{\rm MA}` is the MiniArray response (see :meth:`~nenupy.instru.nenufar.MiniArray.beam`)
and :math:`\mathcal{F}_{\rm NenuFAR}` is the array factor.
This method considers the ``sky`` as the desired output (in terms of
time, frequency, polarization and sky positions). It evaluates the effective
pointing directions for every time step defined in ``sky`` regarding
the ``pointing`` input.
:param sky:
Desired output contained in a :class:`~nenupy.astro.sky.Sky` instance.
(:attr:`~nenupy.astro.sky.Sky.time`, :attr:`~nenupy.astro.sky.Sky.frequency`,
:attr:`~nenupy.astro.sky.Sky.polarization` and
:attr:`~nenupy.astro.sky.Sky.coordinates` are used as inputs for the computation).
:type sky:
:class:`~nenupy.astro.sky.Sky`
:param pointing:
Instance of :class:`~nenupy.astro.pointing.Pointing` that defines
the targeted **numerical** pointing directions over the time.
:type pointing:
:class:`~nenupy.astro.pointing.Pointing`
:param analog_pointing:
Instance of :class:`~nenupy.astro.pointing.Pointing` that defines
the **analog** pointing directions over the time.
This pointing is subject to beamsquint corrections.
:type analog_pointing:
:class:`~nenupy.astro.pointing.Pointing`
:param configuration:
NenuFAR configuration to consider during the beam simulation.
The beamsquint correction and its frequency setting are defined here.
Default is ``NenuFAR_Configuration(beamsquint_correction=True, beamsquint_frequency=50MHz)``.
:type configuration:
:class:`~nenupy.instru.nenufar.NenuFAR_Configuration`
:returns:
The instance of :class:`~nenupy.astro.sky.Sky`
given as input is returned, its attribute
:attr:`~nenupy.astro.sky.Sky.value` is updated
with the result of the beam computation (stored as
an :class:`~dask.array.Array`) and shaped as
``(time, frequency, polarization, coordinates)``.
:rtype:
:class:`~nenupy.astro.sky.Sky`
.. seealso::
:meth:`~nenupy.instru.interferometer.Interferometer.array_factor` and :ref:`beam_simulation_doc`
"""
log.info(
f"Computing <class 'NenuFAR'> beam ({self.size} "
f"Mini-Arrays, {sky.time.size} time and "
f"{sky.frequency.size} frequency slots)."
)
# Sorting out the analog pointing, make it equal to the
# numerical pointing if it is not specifically defined.
if not analog_pointing:
analog_pointing = pointing
log.info(
"Analog pointing is set according to the numerical pointing."
)
# Computing the Array Factor of the whole NenuFAR array.
array_factor = self.array_factor(
sky=sky,
pointing=pointing,
return_complex=return_complex
)
# Finding the unique Mini-Array rotations and the number
# of MAs corresponding to each rotation.
rots, indices, counts = np.unique(
self.miniarray_rotations.to(u.deg).value%60,
return_counts=True,
return_index=True
)
# Summing all different (due to rotation) Mini-Array beam
# patterns, although only executing it at most 6 times
# because there could only be 6 different rotations.
# Even though antGain updates the same sky instance, the
# value attr * count creates new memeory allocations.
antenna_gain = np.sum(
np.array([
gain(
sky=sky,
pointing=analog_pointing,
configuration=configuration,
return_complex=return_complex,
normalize=normalize
).value*count for gain, count in zip(self.antenna_gains[indices], counts)
]),
axis=0
)/np.sum(counts) # TODO check that this is correct to normalize
# Updating the sky object value array where the the sky
# is above the horizon as the product of the NenuFAR array
# factor and the combined Mini-Array gain patterns.
sky.value = array_factor * antenna_gain
return sky
[docs]
def effective_area(self,
frequency: u.Quantity = 50*u.MHz,
elevation: u.Quantity = 90*u.deg
) -> u.Quantity:
r""" Computes the effective area of NenuFAR.
The effective area of NenuFAR (:math:`\mathcal{A}_{\rm eff,\ NenuFAR}`)
is computed as :math:`n_{\rm Mini-Arrays}` times the effective area
of one Mini-Array (:math:`\mathcal{A}_{\rm eff,\ MA}`) as a
function of the ``frequency`` :math:`\nu`, where
:math:`n_{\rm Mini-Arrays}` is the number of Mini-Arrays included.
This method also takes into account the active antennas within
each Mini-Array (such as defined by
:attr:`~nenupy.instru.nenufar.NenuFAR.miniarray_antennas`).
.. math::
\mathcal{A}_{\rm eff,\ NenuFAR} (\nu) = n_{\rm Mini-Arrays} \mathcal{A}_{\rm eff,\ MA} (\nu)
:param frequency:
Frequency at which the effective area is computed.
Default is 50 MHz.
:type frequency:
:class:`~astropy.units.Quantity`
:param elevation:
Elevation at which the effective area is computed.
Default is 90 deg, i.e., as seen from the zenith.
:type elevation:
:class:`~astropy.units.Quantity`
:returns:
Effective area of NenuFAR shaped as ``frequency``.
:rtype:
:class:`~astropy.units.Quantity`
:Example:
>>> from nenupy.instru import NenuFAR
>>> import astropy.units as u
>>> nenufar = NenuFAR()
>>> nenufar.effective_area(50*u.MHz)
18214.701 m2
>>> from nenupy.instru import NenuFAR
>>> import astropy.units as u
>>> nenufar = NenuFAR()
>>> nenufar.effective_area(u.Quantity([20, 30, 40], unit='MHz'))
[55475.372, 42638.252, 28468.245] m2
.. seealso::
:meth:`~nenupy.instru.nenufar.MiniArray.effective_area`
for the computation of :math:`\mathcal{A}_{\rm eff,\ MA}`
and :ref:`effective_area_sec`.
"""
log.debug(
f"NenuFAR effective area, using {self.size} Mini-Arrays "
f"of {self.miniarray_antennas.size} antennas each."
)
# Compute the Mini-Array effective area. Select the active
# antennas in case not all of them are used. By default the
# MA 0 is used but it's the same for every MA.
miniarray_effective_area = MiniArray()[self.miniarray_antennas].effective_area(
frequency=frequency,
elevation=elevation
)
# The NenuFAR array effective area is then only the Mini-Array
# effective area times the number of MAs since there is no
# overlay between individual MA Aeff.
return miniarray_effective_area * self.size
[docs]
@staticmethod
def instrument_temperature(frequency: u.Quantity = 50*u.MHz, lna_filter: int = 0) -> u.Quantity:
""" Instrument temperature at a given ``frequency``.
This depends on the `Low Noise Amplifier <https://nenufar.obs-nancay.fr/en/astronomer/#antennas>`_
characteristics.
:param frequency:
Frequency at which computing the instrument temperature.
Default is ``50 MHz``.
:type frequency:
:class:`~astropy.units.Quantity`
:param lna_filter:
Local Noise Amplifier high-pass filter selection.
Available values are ``0, 1, 2, 3``.
They correspond to minimal frequencies ``10, 15, 20, 25 MHz`` respectively.
Default is ``0``, i.e., 10 MHz filter.
:type lna_filter:
`int`
:returns:
Instrument temperature in Kelvins
:rtype:
:class:`~astropy.units.Quantity`
:Example:
>>> from nenupy.instru import MiniArray
>>> import astropy.units as u
>>> ma = MiniArray()
>>> ma.instrument_temperature(frequency=70*u.MHz)
526.11213 K
.. seealso::
:func:`~nenupy.astro.astro_tools.sky_temperature`
"""
return instrument_temperature(frequency=frequency, lna_filter=lna_filter)
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _toITRF(self):
"""
"""
t = Transformer.from_crs(
crs_from='EPSG:2154', # RGF93
crs_to='EPSG:4896'# ITRF2005
)
antPos = self.antenna_positions.copy()
antPos[:, 0], antPos[:, 1], antPos[:, 2] = t.transform(
xx=antPos[:, 0],
yy=antPos[:, 1],
zz=antPos[:, 2]
)
return antPos
# ============================================================= #
# ============================================================= #