#! /usr/bin/python3
# -*- coding: utf-8 -*-
"""
********
Pointing
********
.. inheritance-diagram:: nenupy.astro.pointing.Pointing
:parts: 3
.. autosummary::
~Pointing
"""
__author__ = "Alan Loh"
__copyright__ = "Copyright 2021, nenupy"
__credits__ = ["Alan Loh"]
__maintainer__ = "Alan"
__email__ = "alan.loh@obspm.fr"
__status__ = "Production"
__all__ = [
"Pointing"
]
from astropy.coordinates.builtin_frames.icrs import ICRS
import numpy as np
import matplotlib.pyplot as plt
import logging
log = logging.getLogger(__name__)
from astropy.coordinates import (
SkyCoord,
AltAz,
EarthLocation
)
from astropy.time import Time, TimeDelta
import astropy.units as u
from nenupy import nenufar_position
from nenupy.astro import AstroObject, altaz_to_radec, radec_to_altaz
from nenupy.astro.target import Target
# from nenupy.io.bst import BST
# ============================================================= #
# ------------------------- Pointing -------------------------- #
# ============================================================= #
[docs]
class Pointing(AstroObject):
""" Class to handle instrument pointing.
:param coordinates:
Pointing coordinates (in the equatorial frame).
:type coordinates:
:class:`~astropy.coordinates.SkyCoord`
:param time:
Pointing times.
:type time:
:class:`~astropy.time.Time`
:param duration:
Pointing duration.
:type duration:
:class:`~astropy.time.TimeDelta`
:param observer:
Earth location from where the pointing is made.
:type observer:
:class:`~astropy.coordinates.EarthLocation`
.. seealso::
:ref:`pointing_doc` for more details on how to instantiate
and use this class.
.. versionadded:: 2.0.0
.. rubric:: Attributes Summary
.. autosummary::
~Pointing.time
~Pointing.duration
~Pointing.horizontal_coordinates
.. rubric:: Methods Summary
.. autosummary::
~Pointing.plot
~Pointing.from_file
~Pointing.from_bst
~Pointing.target_tracking
~Pointing.target_transit
~Pointing.zenith_tracking
.. rubric:: Attributes and Methods Documentation
"""
def __init__(self,
coordinates: SkyCoord,
time: Time,
duration: TimeDelta = TimeDelta(1, format="sec"),
observer: EarthLocation = nenufar_position
):
self.coordinates = coordinates
self.time = time
self.duration = duration
self.observer = observer
def __str__(self):
return str(self.horizontal_coordinates)
def __getitem__(self, time: Time):
""" """
starts = (self.time).jd
stops = (self.time + self.duration).jd
if time.isscalar:
time = time.reshape(1,)
if self.coordinates.isscalar:
# coordinates = self.coordinates.reshape(1,)
coordinates = SkyCoord(
np.repeat(self.coordinates.ra.deg, self.time.size),
np.repeat(self.coordinates.dec.deg, self.time.size),
unit="deg"
)
else:
coordinates = self.coordinates
ras = coordinates.ra.deg
decs = coordinates.dec.deg
ra = []
dec = []
custom_az = []
custom_el = []
for t in time.jd:
# Find the corresponding RA/Dec
mask = (t >= starts) & (t < stops)
if np.all(~mask):
# No match
t = Time(t, format="jd")
log.warning(
f"Default zenith pointing at {t.isot}."
)
zenith = SkyCoord(
0*u.deg,
90*u.deg,
frame=AltAz(
obstime=t,
location=self.observer
)
).transform_to(ICRS)
ra.append(zenith.ra)
dec.append(zenith.dec)
if hasattr(self, "custom_ho_coordinates"):
custom_az.append(0*u.deg)
custom_el.append(90*u.deg)
else:
# there is a match
ra.append(ras[mask][-1]) # -1 because there may be several matches
dec.append(decs[mask][-1])
if hasattr(self, "custom_ho_coordinates"):
if self.custom_ho_coordinates.shape == (1, 1):
# Transit observation
custom_az.append(self.custom_ho_coordinates.az[0, 0].deg*u.deg)
custom_el.append(self.custom_ho_coordinates.alt[0, 0].deg*u.deg)
elif self.custom_ho_coordinates.shape == (1,):
# Transit observation
custom_az.append(self.custom_ho_coordinates.az[0].deg*u.deg)
custom_el.append(self.custom_ho_coordinates.alt[0].deg*u.deg)
else:
custom_az.append(self.custom_ho_coordinates[mask].az[-1].deg*u.deg)
custom_el.append(self.custom_ho_coordinates[mask].alt[-1].deg*u.deg)
pointing = Pointing(
coordinates=SkyCoord(
ra,
dec,
unit='deg'
),
time=time,
)
if hasattr(self, "custom_ho_coordinates"):
#pointing.custom_ho_coordinates = self.custom_ho_coordinates
pointing.custom_ho_coordinates = SkyCoord(
custom_az,
custom_el,
frame=AltAz(
obstime=time,#.reshape(time.size, 1),
location=nenufar_position
)
)
return pointing
# def __getitem__(self, time: Time):
# """ """
# starts = self.time
# stops = starts + self.duration
# # Ease the comparison by starting a bit earlier
# starts -= TimeDelta(0.01, format='sec')
# az = []
# el = []
# horizontal_coordinates = self.horizontal_coordinates
# for t in time:
# idx = np.argwhere(
# (t >= starts) & (t < stops)
# )
# if idx.size == 1 or idx.size == 2:
# idx = idx[0][0]
# ho_coords = horizontal_coordinates
# az.append(ho_coords[idx][0].az.deg)
# el.append(ho_coords[idx][0].alt.deg)
# elif idx.size == 0:
# log.warning(
# f"Default zenith pointing at {t.isot}."
# )
# az.append(180)
# el.append(90)
# else:
# raise Exception(f'Weird... {idx}')
# return Pointing(
# coordinates=SkyCoord(
# az,
# el,
# unit='deg',
# frame=AltAz(
# obstime=time,
# location=nenufar_position
# )
# ),
# time=time,
# )
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def time(self):
""" Pointing start times.
:setter: Pointing times.
:getter: Pointing times.
:type: :class:`~astropy.time.Time`
"""
return self._time
@time.setter
def time(self, t):
if t.isscalar:
t = t.reshape((1,))
self._time = t
@property
def duration(self):
""" Pointing durations.
If this attribute is scalar, it means that the same duration
should be applied to all the start times defined in
:attr:`~nenupy.astro.pointing.Pointing.time`.
:setter: Pointing durations.
:getter: Pointing durations.
:type: :class:`~astropy.time.TimeDelta`
"""
return self._duration
@duration.setter
def duration(self, d):
if (d.size != 1) and (d.size != self.time.size):
raise IndexError(
f"'duration' of size {d.size} does not match 'time' of size {self.time.size}."
)
self._duration = d
@property
def horizontal_coordinates(self):
""" Horizontal coordinates as seen from :attr:`~nenupy.astro.astro_tools.AstroObject.observer`
at :attr:`~nenupy.astro.pointing.Pointing.time`.
:getter: Horizontal coordinates.
:type: :class:`~astropy.coordinates.SkyCoord`
"""
# Coord/time dimensions of a pointing instance are expected
# to be identical. Therefore, only the diagonal terms are needed.
# Otherwise, radec_to_altaz will recompute altaz for each
# radec and each time by default.
altaz = super().horizontal_coordinates
if altaz.size == 1:
return altaz
elif altaz.shape[0] == altaz.size:
return altaz
else:
return altaz[np.identity(altaz.shape[0], dtype=bool)]
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def plot(self, **kwargs):
""" Plots the elevation and azimuth versus time for the current pointing.
:param figsize:
Size of the figure. Default is ``(10, 5)``.
:type figsize:
`tuple`
:param figname:
File name of the figure to save. Default is ``''``,
i.e. show the figure without saving it.
:type figname:
`str`
:param title:
Set the title of the figure.
:type title:
`str`
:param display_duration:
Switch the display of :attr:`~nenupy.astro.pointing.Pointing.duration`.
If set to ``True`` a grey time window is added to all pointing point,
representing the duration of each individual pointing.
Default is ``False``.
:type display_duration:
`bool`
"""
altaz = self.horizontal_coordinates
fig, axs = plt.subplots(
2,
1,
sharex=True,
figsize=kwargs.get('figsize', (10, 5))
)
fig.subplots_adjust(hspace=0)
axs[0].set_title(kwargs.get("title", ""))
# Elevation plot
axs[0].set_ylabel("Elevation (deg)")
if kwargs.get("display_duration", False):
starts = self.time
stops = self.time + self.duration
for start, stop in zip(starts, stops):
axs[0].axvspan(start.datetime, stop.datetime, alpha=0.5, facecolor="grey", edgecolor=None)
axs[0].plot(self.time.datetime, altaz.alt.deg, marker="o", markersize=3)
# Azimuth plot
axs[1].set_ylabel("Azimuth (deg)")
if kwargs.get("display_duration", False):
for start, stop in zip(starts, stops):
axs[1].axvspan(start.datetime, stop.datetime, alpha=0.5, facecolor="grey", edgecolor=None)
axs[1].plot(self.time.datetime, altaz.az.deg, marker="o", markersize=3)
axs[1].set_xlabel(f"Time (since {self.time[0].isot})")
if kwargs.get("figname", "") != "":
plt.savefig(
kwargs.get("figname"),
dpi=300,
transparent=True,
bbox_inches="tight"
)
else:
plt.show()
plt.close('all')
# def to_stmoc(self):
# """ """
[docs]
@classmethod
def from_bst(cls,
bst,
beam: int = 0,
analog: bool = True,
max_points: int = 100
):
""" Instantiates a class:`~nenupy.astro.pointing.Pointing` object from
a :class:`~nenupy.io.bst.BST` object.
:param bst:
:type bst:
:param beam:
:type beam:
`int`
:param analog:
:type analog:
`bool`
:param max_points:
:type max_points:
:returns:
Pointing derived from a NenuFAR BST file.
:rtype:
:class:`~nenupy.astro.pointing.Pointing`
"""
bst.beam = beam
if analog:
time, az, el = bst.analog_pointing
else:
time, az, el = bst.digital_pointing
if time.size == 1:
# for a transit with multiple ABeams
az = np.append(az, [0]*u.deg)
el = np.append(el, [90]*u.deg)
time = time.insert(1, bst.time[-1])
if time.size > max_points:
julian_days = time.jd
jd_rebin = np.linspace(julian_days[0], julian_days[-1], max_points)
az = np.interp(jd_rebin, julian_days, az)
el = np.interp(jd_rebin, julian_days, el)
time = Time(jd_rebin, format='jd')
altaz_coords = SkyCoord(
az[:-1],
el[:-1],
frame=AltAz(
obstime=time[:-1],
location=nenufar_position
)
)
pointing = cls(
coordinates=altaz_to_radec(altaz=altaz_coords),
time=time[:-1],
duration=time[1:] - time[:-1],
observer=nenufar_position
)
pointing.custom_ho_coordinates = altaz_coords
return pointing
[docs]
@classmethod
def from_file(cls,
file_name,
beam_index: int = 0,
include_corrections: bool = True
):
""" Instantiates a :class:`~nenupy.astro.pointing.Pointing` object from
a NenuFAR pointing file.
Several beam pointings (analog and/or numerical) could be described
in ``file_name``. The argument ``beam_index`` allows for the selection
of one of them.
:param file_name:
NenuFAR pointing file, either analog (ending with ``.altazA``) or
numerical (ending with ``.altazB``).
:type file_name:
`str`
:param beam_index:
Beam number to take into account.
:type beam_index:
`int`
:param include_corrections:
Include or not the pointing corrections (default is ``True``).
Only has an effect on analog beam corrections.
:type include_corrections:
`bool`
:return:
Pointing derived from a NenuFAR pointing file.
:rtype:
:class:`~nenupy.astro.pointing.Pointing`
:Example:
>>> from nenupy.astro.pointing import Pointing
>>> pointing = Pointing.from_file(
file_name=".../20211104_170000_20211104_200000_JUPITER_TRACKING.altazA",
beam_index=1
)
"""
if file_name.endswith('.altazA'):
try:
pointing = np.loadtxt(
file_name,
skiprows=3,
comments=";",
dtype={
'names': ('time', 'anabeam', 'az', 'el', 'az_cor', 'el_cor', 'freq', 'el_eff'),
'formats': ('U20', 'i4', 'f4', 'f4', 'f4', 'f4', 'U5', 'f4')
}
)
available_beams = pointing["anabeam"]
pointing = pointing[available_beams == beam_index]
azimuths = pointing["az_cor"] if include_corrections else pointing["az"]
elevations = pointing["el_eff"] if include_corrections else pointing["el"]
except ValueError:
# No correction
pointing = np.loadtxt(
file_name,
skiprows=3,
comments=";",
dtype={
'names': ('time', 'anabeam', 'az', 'el', 'freq', 'el_eff'),
'formats': ('U20', 'i4', 'f4', 'f4', 'U5', 'f4')
}
)
available_beams = pointing["anabeam"]
pointing = pointing[available_beams == beam_index]
azimuths = pointing["az"]
elevations = pointing["el_eff"] if include_corrections else pointing["el"]
except IndexError:
# No beamsquint
pointing = np.loadtxt(
file_name,
skiprows=3,
comments=";",
dtype={
'names': ('time', 'anabeam', 'az', 'el', 'az_cor', 'el_cor'),
'formats': ('U20', 'i4', 'f4', 'f4', 'f4', 'f4')
}
)
available_beams = pointing["anabeam"]
pointing = pointing[available_beams == beam_index]
azimuths = pointing["az_cor"] if include_corrections else pointing["az"]
elevations = pointing["el_cor"] if include_corrections else pointing["el"]
if pointing.size == 0:
raise ValueError(
f"Empty pointing, check the beam_index={beam_index} value "
f"(avalaible beam indices: {np.unique(available_beams)})."
)
times = Time(pointing["time"])
azimuths *= u.deg
elevations *= u.deg
if times.size == 1:
# for a transit with multiple ABeams
azimuths = np.append(azimuths, [0]*u.deg)
elevations = np.append(elevations, [90]*u.deg)
times = times.insert(1, times[-1] + TimeDelta(1, format="sec"))
duration = times[1:] - times[:-1]
times = times[:-1]
altaz_coords = SkyCoord(
azimuths[:-1],
elevations[:-1],
frame=AltAz(
obstime=times,
location=nenufar_position
)
)
elif file_name.endswith('.altazB'):
pointing = np.loadtxt(
file_name,
skiprows=2,
comments=";",
dtype={
'names': ('time', 'anabeam', 'digibeam', 'az', 'el', 'l', 'm', 'n'),
'formats': ('U20', 'i4', 'i4', 'f4', 'f4', 'f4', 'f4', 'f4')
}
)
available_beams = pointing["digibeam"]
pointing = pointing[available_beams == beam_index]
if pointing.size == 0:
raise ValueError(
f"Empty pointing, check the beam_index={beam_index} value "
f"(avalaible beam indices: {np.unique(available_beams)})."
)
times = Time(pointing["time"])
duration = times[1:] - times[:-1]
# Add the last duration at the end (supposed to be 10 seconds)
duration = duration.insert(-1, TimeDelta(10, format="sec", scale=duration.scale))
altaz_coords = SkyCoord(
pointing['az'],
pointing["el"],
unit="deg",
frame=AltAz(
obstime=times,
location=nenufar_position
)
)
pointing = cls(
coordinates=altaz_to_radec(altaz=altaz_coords),
time=times,
duration=duration,
observer=nenufar_position
)
pointing.custom_ho_coordinates = altaz_coords
return pointing
[docs]
@classmethod
def target_tracking(cls,
target: Target,
time: Time,
duration: TimeDelta = TimeDelta(3600, format="sec"),
observer: EarthLocation = nenufar_position,
):
""" Instantiates a :class:`~nenupy.astro.pointing.Pointing`
object that tracks a given celestial source target.
:param target:
Celestial source target to track.
:type target:
:class:`~nenupy.astro.target.Target`
:param time:
Start times of the pointing aiming at ``target``.
:type time:
:class:`~astropy.time.Time`
:param duration:
Duration of each individual pointing. If this argument is
a scalar, then it will be applied to every start time (defined
in ``time``).
Default is one hour.
:type duration:
:class:`~astropy.time.TimeDelta`
:param observer:
Earth location from where the target is observed.
Default is NenuFAR's location.
:type observer:
:class:`~astropy.coordinates.EarthLocation`
:return:
Pointing derived while tracking a specific target.
:rtype:
:class:`~nenupy.astro.pointing.Pointing`
:Example:
>>> from nenupy.astro.pointing import Pointing
>>> from nenupy.astro.target import FixedTarget
>>> from astropy.time import Time, TimeDelta
>>> import numpy as np
>>> cyg_a = FixedTarget.from_name("Cyg A")
>>> pointing = Pointing.target_tracking(
target=cyg_a,
time=Time("2021-01-01 00:00:00") + np.arange(10)*TimeDelta(1800, format="sec"),
duration=TimeDelta(np.ones(10)*1200, format="sec")
)
"""
return cls(
coordinates=target._get_source_coordinates(time),
time=time,
duration=duration,
observer=observer
)
[docs]
@classmethod
def target_transit(cls,
target: Target,
t_min: Time,
duration: TimeDelta = TimeDelta(3600, format="sec"),
dt: TimeDelta = TimeDelta(10, format="sec"),
azimuth: u.Quantity = 180*u.deg,
observer: EarthLocation = nenufar_position,
):
""" Instantiates a :class:`~nenupy.astro.pointing.Pointing`
object around a source transit.
The next transit at a given ``azimuth`` is search from ``t_min``.
The pointing is then centered at the transit, for a ``duration`` period.
The pointing is made in steps numbered as ``duration/dt``.
:param target:
Celestial source target transiting.
:type target:
:class:`~nenupy.astro.target.Target`
:param t_min:
Time from which the next transit is searched for.
:type t_min:
:class:`~astropy.time.Time`
:param duration:
Total duration of the pointing, centered on the transit time.
Default is one hour.
:type duration:
:class:`~astropy.time.TimeDelta`
:param dt:
Time steps of individual pointings.
Default is 10 sec.
:type dt:
:class:`~astropy.time.TimeDelta`
:param azimuth:
Azimuth at which the transit is computed.
A ``ValueError`` exception is raised if the selected
``target`` does not cross the required ``azimuth`` value.
Default is 180 deg (i.e., South).
:type azimuth:
:class:`~astropy.units.Quantity`
:param observer:
Earth location from where the target is observed.
Default is NenuFAR's location.
:type observer:
:class:`~astropy.coordinates.EarthLocation`
:return:
Pointing centered around a given target transit.
:rtype:
:class:`~nenupy.astro.pointing.Pointing`
:Example:
>>> from nenupy.astro.pointing import Pointing
>>> from astropy.time import Time, TimeDelta
>>> pointing = Pointing.target_transit(
target=cyg_a,
time=Time("2021-01-01 00:00:00"),
duration=TimeDelta(7200, format="sec"),
azimuth=180*u.deg
)
"""
transit_time = target.azimuth_transit(azimuth=azimuth, t_min=t_min)
if transit_time.size == 0:
raise ValueError(
f"The selected target does not cross azimuth={azimuth}."
)
elif transit_time.size != 1:
# Possibly 2 crossings if the source is circumpolar
# get the one at maximal elevation
# target_altaz = radec_to_altaz(radec=target.coordinates, time=transit_time, fast_compute=True)[:, 0]
# transit_time = transit_time[target_altaz.alt.argmax()]
transit_time = transit_time[0].reshape((1,))
transit_altaz = radec_to_altaz(radec=target.coordinates, time=transit_time, fast_compute=True)
time_steps = np.floor(duration/dt)
time_steps = time_steps + 1 if time_steps%2 == 0 else time_steps
dt_shifts = np.arange(time_steps) - (time_steps - 1)/2
pointing_times = transit_time + dt_shifts*dt
pointing = cls(
coordinates=SkyCoord(
np.repeat(transit_altaz.az.deg, pointing_times.size),
np.repeat(transit_altaz.alt.deg, pointing_times.size),
unit="deg",
frame=AltAz(
obstime=pointing_times,
location=nenufar_position
)
).transform_to(ICRS),
time=pointing_times,
duration=dt,
observer=observer
)
pointing.custom_ho_coordinates = transit_altaz
return pointing
# @classmethod
# def snapshot(cls,
# target: Target,
# time: Time,
# duration: TimeDelta = TimeDelta(1, format="sec"),
# observer: EarthLocation = nenufar_position
# ):
# """ Instantiates a :class:`~nenupy.astro.pointing.Pointing`
# object as a simple snapshot (i.e., single pointing).
# :param target:
# Celestial source target to track.
# :type target:
# :class:`~nenupy.astro.target.Target`
# :param time:
# Start time of the pointing aiming at ``target``.
# :type time:
# :class:`~astropy.time.Time`
# :param duration:
# Duration of each individual pointing. If this argument is
# a scalar, then it will be applied to every start time (defined
# in ``time``).
# Default is one hour.
# :type duration:
# :class:`~astropy.time.TimeDelta`
# :param observer:
# Earth location from where the target is observed.
# Default is NenuFAR's location.
# :type observer:
# :class:`~astropy.coordinates.EarthLocation`
# :return:
# Pointing derived from a NenuFAR pointing file.
# :rtype:
# :class:`~nenupy.astro.pointing.Pointing`
# :Example:
# >>> from nenupy.astro.pointing import Pointing
# >>> pointing = Pointing.from_file(
# file_name=".../20211104_170000_20211104_200000_JUPITER_TRACKING.altazA",
# beam_index=1
# )
# """
# return cls(
# coordinates=target._get_source_coordinates(time=time),
# time=time,
# duration=duration,
# observer=observer
# )
[docs]
@classmethod
def zenith_tracking(cls,
time: Time,
duration: TimeDelta = TimeDelta(1, format="sec"),
observer: EarthLocation = nenufar_position
):
""" Instantiates a :class:`~nenupy.astro.pointing.Pointing`
object at the local zenith.
:param time:
Start times of the zenith pointing.
:type t_min:
:class:`~astropy.time.Time`
:param duration:
Duration of each individual pointing. If this argument is
a scalar, then it will be applied to every start time (defined
in ``time``).
Default is one hour.
:type duration:
:class:`~astropy.time.TimeDelta`
:param observer:
Earth location from where the target is observed.
Default is NenuFAR's location.
:type observer:
:class:`~astropy.coordinates.EarthLocation`
:return:
Pointing fixed at the local zenith.
:rtype:
:class:`~nenupy.astro.pointing.Pointing`
:Example:
>>> from nenupy.astro.pointing import Pointing
>>> from astropy.time import Time, TimeDelta
>>> pointing = Pointing.target_transit(
target=cyg_a,
time=Time("2021-01-01 00:00:00"),
duration=TimeDelta(7200, format="sec"),
azimuth=180*u.deg
)
"""
az = 0
el = 90
if not time.isscalar:
az = np.repeat(az, time.size)
el = np.repeat(el, time.size)
altaz = SkyCoord(
az,
el,
unit="deg",
frame=AltAz(
obstime=time,
location=observer
)
)
pointing = cls(
coordinates=altaz_to_radec(
altaz=altaz,
fast_compute=False
),
time=time,
duration=duration,
observer=observer
)
pointing.custom_ho_coordinates = altaz.reshape((1,)) if altaz.isscalar else altaz
return pointing
# ============================================================= #
# ============================================================= #