Source code for nenupy.astro.beam_correction

"""
"""

__all__ = [
   "compute_jones_matrices",
   "compute_projection_corrections",
   "convert_to_mueller",
   "matrices_to_hdf5",
   "pointing_correction_factor"
]


from nenupy.astro.jones_mueller import JonesMatrix, MuellerMatrix
from nenupy.astro.pointing import Pointing
from nenupy.instru import NenuFAR, NenuFAR_Configuration, Polarization
from nenupy.astro.sky import Sky
from nenupy.instru.instrument_tools import pointing_correction

from astropy.coordinates import SkyCoord, ICRS
from astropy.time import Time, TimeDelta
import astropy.units as u
import numpy as np
import h5py
from scipy.interpolate import RegularGridInterpolator
import copy
from typing import Union, Tuple
import logging
log = logging.getLogger("nenupy")
log.setLevel(logging.INFO)

try:
    from dreambeam.rime.scenarios import on_pointing_axis_tracking
    from dreambeam.rime.jones import DualPolFieldPointSrc, PJones
    from dreambeam.telescopes.rt import load_mountedfeed
except ModuleNotFoundError:
    # This will raise an error eventually with an appropriate message
    pass

# ============================================================= #
# ------------------ compute_jones_matrices ------------------- #
[docs] def compute_jones_matrices( start_time: Time, time_step: TimeDelta, duration: TimeDelta, skycoord: SkyCoord, parallactic: bool = True ) -> Tuple[Time, u.Quantity, JonesMatrix]: """ """ log.info("\tComputing Jones matrices using DreamBeam...") if time_step.sec <= 1.: raise ValueError("DreamBeam does not allow for time intervals lesser than 1 sec.") try: times, frequencies, Jn, _ = on_pointing_axis_tracking( telescopename="NenuFAR", stnid="NenuFAR", band="LBA", antmodel="Hamaker-NEC4_Charrier_v1r2", obstimebeg=start_time.datetime, obsdur=duration.datetime, obstimestp=time_step.datetime, pointingdir=(skycoord.ra.rad, skycoord.dec.rad, "J2000"), do_parallactic_rot=parallactic ) except NameError: log.error( "DreamBeam is not installed. " "See installation instructions https://github.com/2baOrNot2ba/dreamBeam" ) raise # import numpy as np # times = start_time + TimeDelta(3600, format="sec")*np.arange(12) # frequencies = np.array([30e6, 50e6])*u.MHz # Jn = np.tile(np.array([[1, 1], [0, 1]]), (times.size, frequencies.size, 1, 1)) return Time(times, format="datetime"), frequencies*u.Hz, JonesMatrix(Jn)
# ============================================================= # # ============================================================= # # -------------- compute_projection_corrections --------------- #
[docs] def compute_projection_corrections( start_time: Time, time_step: TimeDelta, duration: TimeDelta, skycoord: SkyCoord, parallactic: bool = True ) -> Tuple[Time, u.Quantity, JonesMatrix]: """_summary_ Took apart on_pointing_axis_tracking method from Dreambeam to only take into account parallactic angle and projection effects. Parameters ---------- start_time : Time _description_ time_step : TimeDelta _description_ duration : TimeDelta _description_ skycoord : SkyCoord _description_ parallactic : bool, optional _description_, by default True Returns ------- Tuple[Time, u.Quantity, JonesMatrix] _description_ Raises ------ ValueError _description_ """ log.info("\tComputing Jones projection matrices using DreamBeam...") if time_step.sec <= 1.: raise ValueError("DreamBeam does not allow for time intervals lesser than 1 sec.") try: stnfeed = load_mountedfeed( tscopename="NenuFAR", station="NenuFAR", band="LBA", modelname="Hamaker-NEC4_Charrier_v1r2" ) stnrot = stnfeed.stnRot freqs = stnfeed.getfreqs() # list pointingdir = (skycoord.ra.rad, skycoord.dec.rad, "J2000") srcfld = DualPolFieldPointSrc(pointingdir) timespy = [] nrTimSamps = int((duration.datetime.total_seconds() / time_step.datetime.seconds)) + 1 for ti in range(0, nrTimSamps): timespy.append(start_time.datetime + ti * time_step.datetime) pjones = PJones(timespy, np.transpose(stnrot), do_parallactic_rot=parallactic) pjonesOfSrc = pjones.op(srcfld) jones = pjonesOfSrc.getValue() jones = np.repeat(jones[None, ...], len(freqs), axis=0) except NameError: log.error( "DreamBeam is not installed. " "See installation instructions https://github.com/2baOrNot2ba/dreamBeam" ) raise return Time(timespy, format="datetime"), freqs * u.Hz, JonesMatrix(jones)
# ============================================================= # # ============================================================= # # -------------------- convert_to_mueller --------------------- #
[docs] def convert_to_mueller(jones_matrices: JonesMatrix) -> MuellerMatrix: """ """ log.info(f"Converting {jones_matrices.shape} Jones matrices to Mueller matrices...") return jones_matrices.to_mueller()
# ============================================================= # # ============================================================= # # --------------------- matrices_to_hdf5 ---------------------- #
[docs] def matrices_to_hdf5( times: Time, frequencies: u.Quantity, matrices: Union[JonesMatrix, MuellerMatrix], file_name: str ) -> None: """ """ if not file_name.endswith(".hdf5"): raise ValueError("The name of the HDF5 file must end with '.hdf5'!") if matrices.shape[:-2] != frequencies.shape + times.shape: raise IndexError( "Inconsistent shapes detected. " f"times.shape={times.shape}, " f"frequencies.shape={frequencies.shape}, " f"matrices.shape={matrices.shape}, " ) log.info(f"Writing the results in {file_name}...") with h5py.File(file_name, "w") as f: f["data"] = matrices f["data"].dims[0].label = "frequency" f["data"].dims[1].label = "time" f["time"] = times.jd f["time"].make_scale("Time (JD)") f["frequency"] = frequencies.to(u.MHz).value f["frequency"].make_scale("Frequency (MHz)") f["data"].dims[0].attach_scale(f["frequency"]) f["data"].dims[1].attach_scale(f["time"]) log.info(f"{file_name} written!")
# ============================================================= # # ============================================================= # # ---------------- pointing_correction_factor ----------------- #
[docs] def pointing_correction_factor( digital_pointing: Pointing, analog_pointing: Pointing, times: Time, frequencies: u.Quantity, nenufar: NenuFAR = NenuFAR(), polarization: Union[np.ndarray, Polarization] = Polarization.NW, nenufar_config: NenuFAR_Configuration = NenuFAR_Configuration(), correction_year: str = "2022", return_interpolation: bool = False ) -> Union[np.ndarray, RegularGridInterpolator]: """Compute the factor needed to correct the intensity of the sources observed when NenuFAR (in beamformed mode) suffered from a pointing offset (i.e., before 2025 June 17). The output correction factor is an array shaped like (``times``, ``frequencies``, ``polarization``). There's no need in computing it on a fine time-frequency grid since, the evolution is quite smooth over a few minutes and can be interpolated to match the original dataset. Tis function simulates NenuFAR's beam and its effective offset in order to propose a correction factor. ``digital_pointing`` and ``analog_pointing`` need to reflect the desired pointing coordinates (i.e., no beamsquint correction nor empirical correction). Parameters ---------- digital_pointing : :class:`~nenupy.astro.pointing.Pointing` Digital pointing orders, that would typically follow an astrophysical source across time. analog_pointing : :class:`~nenupy.astro.pointing.Pointing` Analog pointing orders, given every 6 minutes to the NenuFAR Mini-Arrays. times : :class:`~astropy.time.Time` _description_ frequencies : :class:`~astropy.units.Quantity` _description_ nenufar : :class:`~nenupy.instru.nenufar.NenuFAR`, optional _description_, by default NenuFAR() polarization : :class:`~numpy.ndarray` | :class:`~nenupy.instru.nenufar.Polarization`, optional _description_, by default Polarization.NW nenufar_config : :class:`~nenupy.instru.nenufar.NenuFAR_Configuration`, optional _description_, by default NenuFAR_Configuration() correction_year : `str`, optional _description_ (see :func:`~nenupy.instru.instrument_tools.pointing_correction`), by default "2022" return_interpolation : `bool`, optional Return the interpolation function. If this option is selected the time, frequency and polarization axes are converted to Julian days / MHz / (0=`~nenupy.instru.nenufar.Polarization.NW`, 1=`~nenupy.instru.nenufar.Polarization.NE`) values for interpolation purposes (see example), by default `False` Returns ------- :class:`~numpy.ndarray` or :class:`~scipy.interpolate._rgi.RegularGridInterpolator` _description_ Examples -------- .. code-block:: python >>> from nenupy.astro.pointing import Pointing >>> import numpy as np >>> import astropy.units as u >>> pp_digi = Pointing.from_file("<file_name>.altazB", include_corrections=False) >>> pp_ana = Pointing.from_file("<file_name>.altazA", include_corrections=False) >>> t_steps = 50 >>> f_steps = 30 >>> times = pp_digi.time[0] + np.arange(t_steps) * (pp_digi.time[-1] - pp_digi.time[0])/t_steps >>> freqs = np.linspace(10, 80, f_steps) * u.MHz >>> factor = pointing_correction_factor( digital_pointing=pp_digi, analog_pointing=pp_ana, times=times, frequencies=freqs, correction_year="2022", return_interpolation=True ) >>> X, Y, Z = np.meshgrid(times.jd, freqs.to_value(u.MHz), [0], indexing='ij') >>> interpolated_factor = factor((X, Y, Z)) Warning ------- The correction factor is solely intended to be applied on the target astrophyiscal source's emission. The underlying (though most of the time dominant) background emission is not affected at the same scale by the pointing offset. See Also -------- <link to the report>. """ # Compute the pointing orders given to NenuFAR altaz_orders = pointing_correction( altaz_coordinates=digital_pointing.horizontal_coordinates, correction_year=correction_year ) # Apply the Lambert93 -> geo rotation to determine the effective coordinates rotation = 0.58 * u.deg real_altaz_orders = SkyCoord( altaz_orders.az - rotation, altaz_orders.alt, frame=altaz_orders.frame ) # Simulate the value of the numerical beam in comparison with old and new positions real_pointing = Pointing( coordinates=real_altaz_orders.transform_to(ICRS), time=real_altaz_orders.obstime, # duration=digital_pointing.duration duration=TimeDelta(np.diff(real_altaz_orders.obstime.jd, append=(real_altaz_orders.obstime[-1] + TimeDelta(1, format="jd")).jd), format="jd") ) beam_desired = nenufar.beam( sky=Sky( coordinates=copy.deepcopy(digital_pointing)[times].coordinates, time=times, frequency=frequencies, polarization=polarization ), pointing=copy.deepcopy(real_pointing), analog_pointing=copy.deepcopy(analog_pointing), configuration=nenufar_config ) beam_real = nenufar.beam( sky=Sky( coordinates=copy.deepcopy(real_pointing)[times].coordinates, time=times, frequency=frequencies, polarization=polarization ), pointing=copy.deepcopy(real_pointing), analog_pointing=copy.deepcopy(analog_pointing), configuration=nenufar_config ) # Compute the correction factor desired_val = np.diagonal(beam_desired.value, offset=0, axis1=0, axis2=3) real_val = np.diagonal(beam_real.value, offset=0, axis1=0, axis2=3) factor = (real_val / desired_val).compute() if return_interpolation: # Convert the polarization to index pol_size = beam_desired.polarization.size polars_int = np.zeros(pol_size) for i in range(pol_size): polars_int[i] = 0 if beam_desired.polarization[i] == Polarization.NW else 1 return RegularGridInterpolator( (times.jd, frequencies.to_value(u.MHz), polars_int), np.moveaxis(factor, 2, 0), bounds_error=False ) else: return np.moveaxis(factor, 2, 0) # (time, freq, polar)
# ============================================================= # # ============================================================= #