#! /usr/bin/python3
# -*- coding: utf-8 -*-
r"""
.. _schedule_constraints:
*******************************
Observation constraints classes
*******************************
In the following, an instance of :class:`~nenupy.schedule.targets.ESTarget`
is initialized for the source *Cygnus A* and stored in the variable
``target``. The method :meth:`~nenupy.schedule.targets._Target.computePosition`
is called to compute all astronomical properties at the
location of `NenuFAR <https://nenufar.obs-nancay.fr/en/astronomer/>`_
for the time-range ``times``.
.. code-block:: python
:emphasize-lines: 3,8,9
>>> from astropy.time import Time, TimeDelta
>>> import numpy as np
>>> from nenupy.schedule import ESTarget
>>> dt = TimeDelta(3600, format='sec')
>>> times = Time('2021-01-01 00:00:00') + np.arange(24)*dt
>>> target = ESTarget.fromName('Cas A')
>>> target.computePosition(times)
.. seealso::
:class:`~nenupy.schedule.targets.ESTarget` or
:class:`~nenupy.schedule.targets.ESTarget` objects are
described in :ref:`schedule_targets` in more details.
Single constraint
-----------------
There are several constraints that could be defined.
The simplest and probably most
basic one is the 'elevation constraint' (embedded in the
:class:`~nenupy.schedule.constraints.ElevationCnst` class) since
observing at :math:`e > 0^\circ` is a necessary requirement
for a ground-based observatory.
Once the constraint has been evaluated on ``target``,
a normalized 'score' is returned and can be plotted using the
:meth:`~nenupy.schedule.constraints.Constraint.plot` method.
.. code-block:: python
>>> from nenupy.schedule import ElevationCnst
>>> c = ElevationCnst()
>>> score = c(target)
>>> c.plot()
.. image:: ../_images/elevconstraint_casa0deg.png
:width: 800
If the required elevation to perform a good-quality observation
needs to be greater than a given value, it could be specified
to the :attr:`~nenupy.schedule.constraints.ElevationCnst.elevationMin`
attribute.
.. code-block:: python
>>> c = ElevationCnst(elevationMin=40)
>>> score = c(target)
>>> c.plot()
.. image:: ../_images/elevconstraint_casa40deg.png
:width: 800
The :class:`~nenupy.schedule.constraints.ElevationCnst`'s score is
now at :math:`0` whenever the ``target`` elevation is lower than
:attr:`~nenupy.schedule.constraints.ElevationCnst.elevationMin`.
Multiple constraints
--------------------
Several constraints are often needed to select an appropriate
time window for a given observation. The
:class:`~nenupy.schedule.constraints.Constraints` class is by
default initialized with ``ElevationCnst(elevationMin=0)``, but
any other constraint may be passed as arguments:
.. code-block:: python
>>> from nenupy.schedule import (
ESTarget,
Constraints,
ElevationCnst,
MeridianTransitCnst,
LocalTimeCnst
)
>>> from astropy.time import Time, TimeDelta
>>> from astropy.coordinates import SkyCoord, Angle
>>> dts = np.arange(24+1)*TimeDelta(3600, format='sec')
>>> times = Time('2021-01-01 00:00:00') + dts
>>> target = ESTarget.fromName('Cas A')
>>> target.computePosition(times)
>>> cnst = Constraints(
ElevationCnst(elevationMin=20, weight=3),
MeridianTransitCnst(),
LocalTimeCnst(Angle(12, 'hour'), Angle(4, 'hour'))
)
>>> cnst.evaluate(target, times)
>>> cnst.plot()
.. image:: ../_images/sch_constraints.png
:width: 800
.. inheritance-diagram:: nenupy.schedule.constraints
:parts: 3
"""
# TO DO : CENTRER SUR L AZIMUTH / transit au meridien
__author__ = 'Alan Loh'
__copyright__ = 'Copyright 2021, nenupy'
__credits__ = ['Alan Loh']
__maintainer__ = 'Alan'
__email__ = 'alan.loh@obspm.fr'
__status__ = 'Production'
__all__ = [
'Constraint',
'TargetConstraint',
'ScheduleConstraint',
'ElevationCnst',
'MeridianTransitCnst',
'AzimuthCnst',
'LocalSiderealTimeCnst',
'LocalTimeCnst',
"UTCHourCnst",
'TimeRangeCnst',
'NightTimeCnst',
'PeriodicCnst',
'Constraints'
]
from abc import ABC
import numpy as np
from astropy.time import Time, TimeDelta
from astropy.coordinates import Angle, Longitude
import pytz
import matplotlib.pyplot as plt
import matplotlib as mpl
from copy import copy
from typing import List, Tuple, Union
from nenupy.schedule.targets import _Target, SSTarget
import logging
log = logging.getLogger(__name__)
# ============================================================= #
# ------------------------ Constraint ------------------------- #
# ============================================================= #
[docs]
class Constraint(ABC):
""" Base class for all the constraint definitions.
.. versionadded:: 1.2.0
"""
[docs]
def __init__(self, weight=1):
self.score = 1
self.weight = weight
[docs]
def __call__(self, *arg) -> np.ndarray:
"""Evaluate the constraint against the schedule properties.
Returns
-------
:class:`~numpyp.ndarray`
The constraint score evaluated at each time step.
Raises
------
AttributeError
If the constraint does not have an `_evaluate()` method.
"""
if not hasattr(self, "_evaluate"):
raise AttributeError(
"<Constraint> should not be used on its own."
)
return self._evaluate(*arg)
def __str__(self):
return f'{self.__class__}'
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def weight(self) -> Union[int, float]:
""" Relative weight associated to the constraint.
The constraint score of an observation block within a given schedule
is ultimately taken at the weighted mean among all the constraints
associated to this observaiton request.
"""
return self._weight
@weight.setter
def weight(self, w: Union[int, float]):
if not isinstance(w, (float, int)):
raise TypeError(
"<weight> should be a number."
)
elif w <= 0:
raise ValueError(
"<weight> should be > 0."
)
self._weight = w
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def plot(self, fig: mpl.figure.Figure = None, ax: mpl.axes.Axes = None, **kwargs) -> None:
""" Plot the constraint's score previously evaluated.
Parameters
----------
fig : :class:`~matplotlib.figure.Figure`
Figure instance to re-use for plotting purposes, by default `None` (i.e., a new :class:`~matplotlib.figure.Figure` will be created)
ax : :class:`~matplotlib.axes.Axes`
Ax instance to re-use for plotting purposes, by default `None` (i.e., a new :class:`~matplotlib.axes.Axes` will be created)
Other Parameters
----------------
figsize : [`float`, `float`]
Size of the figure, by default ``(10, 5)``
figname : `str`
Name of the figure to be saved, by default ``''`` (i.e., the figure is only displayed)
marker : `str`
Plot marker type (see :func:`matplotlib.pyplot.plot`), by default ``'.'``
linestyle : `str`
Plot line style (see :func:`matplotlib.pyplot.plot`), by default ``':'``
linewidth: `int`
Plot line width (see :func:`matplotlib.pyplot.plot`), by default ``1``
"""
fig = plt.figure(
figsize=kwargs.get("figsize", (10, 5))
)
ax = fig.add_subplot()
self._plot_constraint(ax=ax, **kwargs)
ax.set_xlabel("Time index")
ax.set_ylabel("Constraint score")
ax.set_title(f"{self}")
# Save or show the figure
figname = kwargs.get("figname", "")
if figname != "":
fig.savefig(
figname,
dpi=300,
bbox_inches="tight",
transparent=True
)
log.info(f"Figure '{figname}' saved.")
else:
plt.show()
plt.close("all")
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _plot_constraint(self, ax: mpl.axes.Axes,**kwargs) -> None:
""" Internal method to plot a single constraint without
initializing the figure object.
"""
ax.plot(
np.where(
np.isnan(self.score),
0,
self.score
),
marker=kwargs.get("marker", "."),
linestyle=kwargs.get("linestyle", ":"),
linewidth=kwargs.get("linewidth", 1),
label=f"{self.__class__}"
)
@staticmethod
def _is_numpy_instance(arr):
""" Check that arr is a genuine numpy array.
"""
if not isinstance(arr, np.ndarray):
raise TypeError(
f'{np.ndarray} object expected.'
)
# ============================================================= #
# ============================================================= #
# ============================================================= #
# --------------------- TargetConstraint ---------------------- #
# ============================================================= #
[docs]
class TargetConstraint(Constraint):
""" Base class for constraints involving target property checks.
.. warning::
:class:`~nenupy.schedule.constraints.TargetConstraint`
should not be used on its own.
.. versionadded:: 1.2.0
"""
[docs]
def __init__(self, weight):
super().__init__(weight=weight)
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
@staticmethod
def _is_target_instance(target):
"""
"""
if not isinstance(target, _Target):
raise TypeError(
f'{_Target} object expected.'
)
@staticmethod
def _pass_angle_instance(angle):
"""
"""
if not isinstance(angle, Angle):
try:
angle = Angle(angle, unit="deg")
except:
raise Exception("An angle is expected.")
return angle
# ============================================================= #
# ============================================================= #
# ============================================================= #
# -------------------- ScheduleConstraint --------------------- #
# ============================================================= #
[docs]
class ScheduleConstraint(Constraint):
""" Base class for constraints involving time range checks.
.. warning::
:class:`~nenupy.schedule.constraints.ScheduleConstraint`
should not be used on its own.
.. versionadded:: 1.2.0
"""
[docs]
def __init__(self, weight):
super().__init__(weight=weight)
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
@staticmethod
def _is_time_instance(time):
"""
"""
if not isinstance(time, Time):
raise TypeError(
f'{Time.__class__} object expected.'
)
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ----------------------- ElevationCnst ----------------------- #
# ============================================================= #
[docs]
class ElevationCnst(TargetConstraint):
""" Elevation constraint
:param elevationMin:
Target's elevation below which the constraint score
is null. If provided as a dimensionless quantity,
the value is interpreted as degrees.
:type elevationMin:
`int`, `float`, or :class:`~astropy.coordinates.Angle`
:param weight:
Weight of the constraint. Allows to ponderate each
constraint with respect to each other if
:class:`~nenupy.schedule.constraint.ElevationCnst`
is included in :class:`~nenupy.schedule.constraint.Constraints`
for instance.
:type weight:
`int` or `float`
.. versionadded:: 1.2.0
:Example:
>>> from astropy.time import Time, TimeDelta
>>> from nenupy.schedule.targets import ESTarget
>>> from nenupy.schedule.constraints import ElevationCnst
>>> dt = TimeDelta(3600, format='sec')
>>> times = Time('2021-01-01 00:00:00') + np.arange(24)*dt
>>> cas_a = ESTarget.fromName('Cas A')
>>> cas_a.computePosition(times)
>>> elevation_constraint = ElevationCnst()
>>> score = elevation_constraint(target, None)
>>> c.plot()
"""
[docs]
def __init__(self, elevationMin=0., scale_elevation=True, weight=1):
super().__init__(weight=weight)
self.elevationMin = elevationMin
self.scale_elevation = scale_elevation
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def elevationMin(self):
""" Minimal elevation required to perform an observation.
:type: `float` or :class:`~astropy.coordinates.Angle`
"""
return self._elevationMin
@elevationMin.setter
def elevationMin(self, emin):
emin = self._pass_angle_instance(emin)
if (emin.deg < 0.) or (emin.deg > 90):
raise ValueError(
f'`elevationMin`={emin.deg} deg must fall '
'between 0 and 90 degrees.'
)
self._elevationMin = emin
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def get_score(self, indices):
r""" Computes the :class:`~nenupy.schedule.constraint.ElevationCnst`'s
score for the given ``indices``.
The score is computed as:
.. math::
{\rm score} = \left\langle \frac{\mathbf{e}(t)}{{\rm max}(\mathbf{e})} \right\rangle_{\rm indices}
where :math:`\mathbf{e}(t)` is the elevation of the
target (set to :math:`0` whenever it is lower than
:attr:`~nenupy.schedule.constraints.ElevationCnst.elevationMin`).
:param indices:
Indices of :class:`~nenupy.schedule.constraint.Constraint.score`
on which the score will be evaluated.
:type indices:
:class:`~numpy.ndarray`
:returns:
Constraint score.
:rtype: `float`
"""
# aboveMin = self.score[indices] > 0
# return np.mean(self.score[indices][aboveMin])
# return np.mean(self.score[indices])
# return np.mean(
# np.where(
# np.isnan(self.score[indices]),
# 0,
# self.score[indices]
# )
# )
return np.mean(
np.where(
self.score[indices]>0.,
1,
0
)
)
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _evaluate(self, target, nslots = None):
""" Evaluates the constraint :class:`~nenupy.schedule.constraint.ElevationCnst`
on the ``target`` which astronomical positions need
to be computed first (using :meth:`~nenupy.schedule.targets._Target.computePosition`).
:param target:
Target for which :class:`~nenupy.schedule.constraint.ElevationCnst`
should be evaluated.
:type target: :class:`~nenupy.schedule.targets._Target`
:returns:
Constraint's score.
:rtype: :class:`~numpy.ndarray`
"""
self._is_target_instance(target)
elevation = target.elevation.deg
elevMean = (elevation[1:] + elevation[:-1])/2
# elevMean[elevMean <= self.elevationMin.deg] = 0.
elevMean[elevMean <= self.elevationMin.deg] = np.nan
# if elevMax == 0.:
if all(np.isnan(elevMean)):
log.warning(
"Constraint <ElevationConstraint(elevationMin="
f"{self.elevationMin})> evaluated for target "
f"'{target.target}' cannot be satisfied over the "
"given time range."
)
# self.score = elevation[1:]*0.
self.score = elevation[1:] * np.nan
elif not self.scale_elevation:
self.score = elevMean/elevMean
else:
elevMax = np.nanmax(elevMean)
self.score = elevMean/elevMax
return self.score
# ============================================================= #
# ============================================================= #
# ============================================================= #
# -------------------- MeridianTransitCnst -------------------- #
# ============================================================= #
[docs]
class MeridianTransitCnst(TargetConstraint):
""" Meridian Transit constraint
.. versionadded:: 1.2.0
"""
[docs]
def __init__(self, weight=1):
super().__init__(weight=weight)
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def get_score(self, indices):
r""" Computes the :class:`~nenupy.schedule.constraint.MeridianTransitCnst`'s
score for the given ``indices``.
Returns 1 if the merdian transit is within the indices
The score is computed as:
.. math::
{\rm score} = \begin{cases}
1, t_{\rm transit} \in \mathbf{t}({\rm indices})\\
0, t_{\rm transit} \notin \mathbf{t}({\rm indices})
\end{cases}
where :math:`t_{\rm transit}` is the meridian transit time
and :math:`\mathbf{t}` is the time range on which the
target positions are computed.
:param indices:
Indices of :class:`~nenupy.schedule.constraint.Constraint.score`
on which the score will be evaluated.
:type indices:
:class:`~numpy.ndarray`
:returns:
Constraint score.
:rtype: `float`
"""
self._is_numpy_instance(indices)
#return np.sum(self.score[indices], axis=-1)
return int((self.score[indices]>0.7).any())
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _evaluate(self, target, nslots):
self._is_target_instance(target)
hourAngle = target.hourAngle
transitIdx = np.where(
(np.roll(hourAngle, -1) - hourAngle)[:-1] < 0
)[0]
scores = np.zeros(hourAngle.size)
# Set neighbor slots to non-zero to maximize centering
slotShifts = np.arange(nslots) - int(np.floor(nslots/2))
neighborIdx = (transitIdx[:, None] + slotShifts[None, :]).ravel()
outOfBounds = (neighborIdx < 0) + (neighborIdx >= scores.size)
neighborIdx = np.delete(
neighborIdx,
np.argwhere(outOfBounds)
)
scores = np.where(
np.isin(
np.arange(scores.size, dtype=int),
neighborIdx
),
0.5,
scores
)
# Set transit slots to maximal score
scores[transitIdx] = 1.
self.score = scores[:-1]
return self.score
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ------------------------ AzimuthCnst ------------------------ #
# ============================================================= #
[docs]
class AzimuthCnst(TargetConstraint):
"""
.. versionadded:: 1.2.0
"""
[docs]
def __init__(self, azimuth, weight=1):
super().__init__(weight=weight)
self.azimuth = azimuth
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def azimuth(self):
"""
"""
return self._azimuth
@azimuth.setter
def azimuth(self, az):
az = self._pass_angle_instance(az)
if (az.deg < 0.) or (az.deg > 360):
raise ValueError(
f'`azimuth`={az.deg} deg must fall '
'between 0 and 360 degrees.'
)
self._azimuth = az
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def get_score(self, indices):
"""
"""
self._is_numpy_instance(indices)
# return np.sum(self.score[indices], axis=-1)
return int((self.score[indices]>0.7).any())
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _evaluate(self, target, nslots):
self._is_target_instance(target)
azimuths = target.azimuth.rad
az = self.azimuth.rad
if target.isCircumpolar:
az = np.angle(np.cos(az) + 1j*np.sin(az))
complexAzStarts = np.angle(
np.cos(azimuths[:-1]) + 1j*np.sin(azimuths[:-1])
)
complexAzStops = np.angle(
np.cos(azimuths[1:]) + 1j*np.sin(azimuths[1:])
)
mask = (az >= complexAzStarts) &\
(az <= complexAzStops)
mask |= (az <= complexAzStarts) &\
(az >= complexAzStops)
else:
mask = (az >= azimuths[:-1]) &\
(az <= azimuths[1:])
scores = np.zeros(mask.size)
azIndices = np.where(mask)[0]
# Set neighbor slots to non-zero to maximize centering
slotShifts = np.arange(nslots) - int(np.floor(nslots/2))
if nslots%2 == 0:
# if nslots is even, we have to think which slot to include in addition to the max one.
# print(np.abs(azimuths[azIndices] - az))
pass
neighborIdx = (azIndices[:, None] + slotShifts[None, :]).ravel()
outOfBounds = (neighborIdx < 0) + (neighborIdx >= scores.size)
neighborIdx = np.delete(
neighborIdx,
np.argwhere(outOfBounds)
)
scores = np.where(
np.isin(
np.arange(scores.size, dtype=int),
neighborIdx
),
0.5,
scores
)
# Set azimuth found slots to maximal score
scores[azIndices] = 1.
self.score = scores
# self.score = mask.astype(float)
return self.score
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ------------------- LocalSiderealTimeCnst ------------------- #
# ============================================================= #
[docs]
class LocalSiderealTimeCnst(TargetConstraint):
[docs]
def __init__(self, lst: Longitude, strict_crossing: bool = True, weight: float = 1):
super().__init__(weight=weight)
self.lst = lst
self.strict_crossing = strict_crossing
# --------------------------------------------------------- #
# ------------------------ Methods ------------------b------ #
[docs]
def get_score(self, indices):
"""
"""
self._is_numpy_instance(indices)
# return np.mean(self.score[indices], axis=-1)
return int((self.score[indices]>0.7).any())
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _evaluate(self, target, nslots):
""" time: dim + 1
"""
self._is_target_instance(target)
pi_lon = Longitude(180, unit='deg')
lst_starts = copy(target._lst[:-1])
lst_stops = copy(target._lst[1:])
lst_desired = np.repeat(self.lst, lst_starts.size)
# Deal with crossing 360/0 case
tricky_mask = lst_starts > lst_stops
lst_starts[tricky_mask] = (lst_starts[tricky_mask] - pi_lon).wrap_at('360d')
lst_stops[tricky_mask] = (lst_stops[tricky_mask] - pi_lon).wrap_at('360d')
lst_desired[tricky_mask] = (lst_desired[tricky_mask] - pi_lon).wrap_at('360d')
mask = (lst_desired >= lst_starts) & (lst_desired <= lst_stops)
# Compute the score
scores = np.zeros(mask.size)
lst_crossing_indices = np.where(mask)[0]
if self.strict_crossing:
# Set neighbor slots to non-zero to maximize centering
slot_shifts = np.arange(nslots) - int(np.floor(nslots/2))
if nslots%2 == 0:
# if nslots is even, we have to think which slot to include in addition to the max one.
pass
neighbor_idx = (lst_crossing_indices[:, None] + slot_shifts[None, :]).ravel()
outOfBounds = (neighbor_idx < 0) + (neighbor_idx >= scores.size)
neighbor_idx = np.delete(
neighbor_idx,
np.argwhere(outOfBounds)
)
scores = np.where(
np.isin(
np.arange(scores.size, dtype=int),
neighbor_idx
),
0.5,
scores
)
# Set azimuth found slots to maximal score
scores[lst_crossing_indices] = 1.
else:
# Number of consecutive false in the array
scores[lst_crossing_indices] = 1
bool_array = ~scores.astype(bool)
consecutive_falses = np.diff(
np.where(
np.concatenate(
([bool_array[0]],
bool_array[:-1] != bool_array[1:],
[True])
)
)[0]
)[::2].max()
kernel_size = consecutive_falses + 1 if consecutive_falses%2==0 else consecutive_falses
kernel = np.arange(kernel_size) - int(np.floor(kernel_size/2))
kernel = 1. - np.abs(kernel)/kernel.max()
scores = np.convolve(scores, kernel, mode='same')
self.score = scores
# self.score = mask.astype(float)
return self.score
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ----------------------- LocalTimeCnst ----------------------- #
# ============================================================= #
[docs]
class LocalTimeCnst(ScheduleConstraint):
"""
.. versionadded:: 1.2.0
"""
[docs]
def __init__(self, hMin, hMax, weight=1):
super().__init__(weight=weight)
self.hMin = hMin
self.hMax = hMax
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def hMin(self):
"""
"""
return self._hMin
@hMin.setter
def hMin(self, h):
if not isinstance(h, Angle):
raise TypeError(
f'{h} should be of type {type(Angle)}.'
)
self._hMin = h
@property
def hMax(self):
"""
"""
return self._hMax
@hMax.setter
def hMax(self, h):
if not isinstance(h, Angle):
raise TypeError(
f'{h} should be of type {type(Angle)}.'
)
self._hMax = h
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def get_score(self, indices):
"""
"""
self._is_numpy_instance(indices)
return np.mean(self.score[indices], axis=-1)
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _evaluate(self, time, nslots):
"""
"""
self._is_time_instance(time)
# Convert time to France local time (take into account
# daylight savings)
tz = pytz.timezone('Europe/Paris')
timezoneTime = map(tz.localize, time.datetime)
utcOffset = np.array(
[tt.utcoffset().total_seconds() for tt in timezoneTime]
)
localTime = time + TimeDelta(utcOffset, format='sec')
# Convert the 'hour' part in decimal 'angle' values
hours = np.array([tt.split()[1] for tt in localTime.iso])
localHours = Angle(hours, unit='hour').hour
# Selection
if self.hMin > self.hMax:
# If 'midnight' is in the range
mask = (localHours <= self.hMin.hour) &\
(localHours >= self.hMax.hour)
mask = ~mask
else:
mask = (localHours >= self.hMin.hour) &\
(localHours <= self.hMax.hour)
score = mask[:-1].astype(float)
self.score = np.where(score==0, np.nan, score)
return self.score
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ------------------------ UTCHourCnst ------------------------ #
# ============================================================= #
[docs]
class UTCHourCnst(ScheduleConstraint):
"""Contraint to force an observation block to be scheduled between two fixed UTC hours.
"""
[docs]
def __init__(self, h_min: Angle, h_max: Angle, weight: float = 1):
super().__init__(weight=weight)
self.h_min = h_min
self.h_max = h_max
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def h_min(self):
"""
"""
return self._h_min
@h_min.setter
def h_min(self, h):
if not isinstance(h, Angle):
raise TypeError(
f'{h} should be of type {type(Angle)}.'
)
self._h_min = h
@property
def h_max(self):
"""
"""
return self._h_max
@h_max.setter
def h_max(self, h):
if not isinstance(h, Angle):
raise TypeError(
f'{h} should be of type {type(Angle)}.'
)
self._h_max = h
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def get_score(self, indices):
"""
"""
self._is_numpy_instance(indices)
return np.mean(self.score[indices], axis=-1)
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _evaluate(self, time, nslots):
"""
"""
self._is_time_instance(time)
# Convert the 'hour' part in decimal 'angle' values
hours = np.array([tt.split()[1] for tt in time.iso])
local_hours = Angle(hours, unit='hour').hour
# Selection
if self.h_min > self.h_max:
# If 'midnight' is in the range
mask = (local_hours <= self.h_min.hour) &\
(local_hours >= self.h_max.hour)
mask = ~mask
else:
mask = (local_hours >= self.h_min.hour) &\
(local_hours <= self.h_max.hour)
score = mask[:-1].astype(float)
self.score = np.where(score==0, np.nan, score)
return self.score
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ----------------------- TimeRangeCnst ----------------------- #
# ============================================================= #
[docs]
class TimeRangeCnst(ScheduleConstraint):
"""
.. versionadded:: 1.2.0
"""
[docs]
def __init__(self, time_min, time_max, weight=1):
super().__init__(weight=weight)
self.time_min = time_min
self.time_max = time_max
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def time_min(self):
"""
"""
return self._time_min
@time_min.setter
def time_min(self, t):
if not isinstance(t, Time):
raise TypeError(
f'{t} should be of type {type(Time)}.'
)
self._time_min = t
@property
def time_max(self):
"""
"""
return self._time_max
@time_max.setter
def time_max(self, t):
if not isinstance(t, Time):
raise TypeError(
f'{t} should be of type {type(Time)}.'
)
self._time_max = t
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def get_score(self, indices):
"""
"""
self._is_numpy_instance(indices)
return np.mean(self.score[indices], axis=-1)
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _evaluate(self, time, nslots):
""" time: dim + 1
"""
self._is_time_instance(time)
jds = time.jd
if self.time_min.isscalar:
mask = (jds >= self.time_min.jd) & (jds <= self.time_max.jd)
else:
mask = np.sum(
(jds[:, None] >= self.time_min.jd) & (jds[:, None] <= self.time_max.jd),
axis=1,
dtype=bool
)
score = mask[:-1].astype(float)
self.score = np.where(score==0, np.nan, score)
return self.score
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ----------------------- NightTimeCnst ----------------------- #
# ============================================================= #
[docs]
class NightTimeCnst(Constraint):
""" """
[docs]
def __init__(self, sun_elevation_max=0., scale_elevation=True, weight=1):
super().__init__(weight=weight)
self.sun_elevation_max = sun_elevation_max
self.scale_elevation = scale_elevation
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
def get_score(self, indices):
""" """
return np.mean(
np.where(
self.score[indices]>0.,
1,
0
)
)
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _evaluate(self, sun_elevation):
"""
"""
sun_elevation[sun_elevation > self.sun_elevation_max] = np.nan
if all(np.isnan(sun_elevation)):
log.warning(
"Constraint <NightTimeCnst(sun_elevation_max="
f"{self.sun_elevation_max})> cannot be satisfied"
"over the given time range."
)
self.score = sun_elevation[1:]*np.nan
elif not self.scale_elevation:
self.score = sun_elevation/sun_elevation
else:
elevation_min = np.nanmin(sun_elevation)
self.score = sun_elevation/elevation_min
return self.score
# ============================================================= #
# ============================================================= #
# ============================================================= #
# --------------------- SunProximityCnst ---------------------- #
# ============================================================= #
# class SunProximityCnst(Constraint):
# def __init__(self, min_angular_sep=0., max_angular_sep=360, weight=1):
# super().__init__(weight=weight)
# self.min_angular_sep = min_angular_sep
# self.max_angular_sep = max_angular_sep
# # --------------------------------------------------------- #
# # ------------------------ Methods ------------------------ #
# def get_score(self, indices):
# """ """
# return np.mean(
# np.where(
# self.score[indices]>0.,
# 1,
# 0
# )
# )
# # --------------------------------------------------------- #
# # ----------------------- Internal ------------------------ #
# def _evaluate(self, target, sun):
# """
# """
# separation = target._fk5.separation(sun._fk5).deg
# proximity_mask = (separation >= self.min_angular_sep) * (separation <= self.max_angular_sep)
# score = proximity_mask[:-1].astype(float)
# self.score = np.where(score==0, np.nan, score)
# return self.score
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ----------------------- PeriodicCnst ------------------------ #
# ============================================================= #
[docs]
class PeriodicCnst(ScheduleConstraint):
"""Constraint to force a block to be scheduled within periodical time intervals in the schedule.
Examples
--------
.. code-block:: python
>>> from nenupy.schedule.constraints import PeriodicCnst
>>> from astropy.time import Time, TimeDelta
>>> start = Time("2025-12-01 00:00:00")
>>> p = TimeDelta(2, format="jd")
>>> tol = TimeDelta(0.3, format="jd")
>>> cnt = PeriodicCnst(
start_time=start,
periodicity=p,
tolerance=tol
)
>>> times = start + np.arange(300) * TimeDelta(1800, format="sec")
>>> score = cnt(times)
>>> fig = plt.figure(figsize=(10, 5))
>>> ax = fig.add_subplot()
>>> for i in range(4):
>>> middle = start + i * p
>>> ax.axvline(middle.datetime, linestyle=":", color="black")
>>> ax.axvspan((middle - tol).datetime, (middle + tol).datetime, color="tab:orange", alpha=0.5)
>>> ax.plot(times[:-1].datetime, np.ones(times[:-1].size), marker=".", linestyle="")
>>> mask = ~ np.isnan(score)
>>> ax.plot(times[:-1].datetime[mask], np.ones(times[:-1].size)[mask], marker=".", linestyle="")
>>> ax.set_xlabel("Time")
.. figure:: ../_images/schedule_images/periodic_cnst.png
:width: 650
:align: center
"""
[docs]
def __init__(self,
start_time: Time,
periodicity: TimeDelta,
tolerance: TimeDelta,
weight: float = 1
):
"""Return an instance of :class:`~nenupy.schedule.constraints.PeriodicCnst`.
Parameters
----------
start_time : :class:`~astropy.time.Time`
Center of the first time interval.
periodicity : :class:`~astropy.time.TimeDelta`
Time duration before the next repetition of the time interval.
tolerance : :class:`~astropy.time.TimeDelta`
half-width of each time interval.
weight : `float`, optional
Constraint weight used to ponderate constraints with respect to one another, by default 1
"""
super().__init__(weight=weight)
self.start_time = start_time
self.periodicity = periodicity
self.tolerance = tolerance
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def get_score(self, indices: np.ndarray) -> float:
"""Return the constraint score evaluated on a schedule ``indices``.
The constraint must have been evaluated first.
Parameters
----------
indices : :class:`~numpyp.ndarray`
Time indices on which the constraint score should be computed.
The indices must match the array stored in :attr:`~nenupy.scedule.constraints.Constraint.score`.
Returns
-------
`float`
The constraint score evaluated as its average on ``indices``.
"""
self._is_numpy_instance(indices)
return np.mean(self.score[indices], axis=-1)
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _evaluate(self, time: Tuple, nslots: int = None) -> np.ndarray:
"""
"""
self._is_time_instance(time)
integer_time_div = (time - self.start_time).jd % self.periodicity.jd
time_mask = (integer_time_div <= self.tolerance.jd) + (integer_time_div >= self.periodicity.jd - self.tolerance.jd)
score = time_mask[:-1].astype(float)
self.score = np.where(score==0, np.nan, score)
return self.score
# ============================================================= #
# ============================================================= #
# ============================================================= #
# -------------------- SolarProximityCnst --------------------- #
# ============================================================= #
[docs]
class SolarProximityCnst(Constraint):
""" """
[docs]
def __init__(self,
closer: bool = False,
min_separation_deg: float = None,
max_separation_deg: float = None,
scale: bool = True,
weight: float = 1
):
super().__init__(weight=weight)
self.closer = closer
self.min_separation_deg = min_separation_deg
self.max_separation_deg = max_separation_deg
self.scale = scale
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
def get_score(self, indices: np.ndarray) -> np.ndarray:
""" """
return np.mean(
np.where(
self.score[indices] > 0.,
1,
0
)
)
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _evaluate(self, target: _Target, sun_position: SSTarget):
"""
"""
separation = target.target.separation(sun_position).deg
# Set to NaN intervals where the separation is outside requested values
if not (self.min_separation_deg is None):
separation[separation <= self.min_separation_deg] = np.nan
if not (self.max_separation_deg is None):
separation[separation > self.max_separation_deg] = np.nan
if all(np.isnan(separation)):
log.warning(
"Constraint <SolarProximityCnst(min_separation_deg="
f"{self.min_separation_deg}, max_separation_deg={self.max_separation_deg})> "
"cannot be satisfied over the given time range."
)
# self.score = separation[1:] * np.nan
self.score = separation * np.nan
else:
# sep_min = np.nanmin(separation)
sep_max = np.nanmax(separation)
# farther_normalized_score = (separation - sep_min) / (sep_max - sep_min)
# if self.closer:
# self.score = 1 - farther_normalized_score
# else:
# self.score = farther_normalized_score
if self.closer:
# Add .1 to avoid 0
self.score = (sep_max + 0.1 - separation) / np.nanmax(sep_max + 0.1 - separation)
else:
self.score = separation / sep_max
if not self.scale:
self.score = np.where(self.score > 0, 1, self.score)
return self.score
else:
return self.score
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ------------------------ Constraints ------------------------ #
# ============================================================= #
[docs]
class Constraints(object):
"""
.. versionadded:: 1.2.0
"""
[docs]
def __init__(self, *constraints):
self._default_el = False
self.constraints = constraints
self.score = None
# Check they are all of unique type
unique, count = np.unique(
[str(cons.__class__) for cons in self.constraints],
return_counts=True
)
if any(count > 1):
message = (
'There can only be one constraint type per '
'observation:'
)
for typ, n in zip(unique, count):
if n > 1:
message += f"\n\t* '{typ}': {n} instances."
raise ValueError(message)
# Add the elevation constraint by default
#if not str(ElevationCnst) in unique:
if not np.any(np.isin(np.array([str(ElevationCnst)]), unique)):
self.constraints += (ElevationCnst(0.),)
self._default_el = True
def __add__(self, other):
"""
"""
if not isinstance(other, Constraint):
raise TypeError('')
if isinstance(other, ElevationCnst):
if self._default_el:
# Remove the default elevation constraint
# if a new one is added
cs = np.array([str(c.__class__) for c in self.constraints])
elCnst_idx = np.where(cs == str(ElevationCnst))[0][0]
listCnst = list(self.constraints)
listCnst.pop(elCnst_idx)
self.constraints = tuple(listCnst)
self._default_el = False
constraints = self.constraints + (other,)
cts = Constraints(*constraints)
cts._default_el = self._default_el
return cts
def __getitem__(self, n):
"""
"""
return self.constraints[n]
def __len__(self):
"""
"""
return len(self.constraints)
def __del__(self):
for constraint in self.constraints:
del constraint
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def size(self):
"""
"""
return len(self)
@property
def weights(self):
"""
"""
return np.array([cnt.weight for cnt in self])
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def evaluate(self, target, time, nslots=1, sun_elevation=None, sun_position=None):
"""
"""
cnts = np.zeros((self.size, time.size - 1))
unEvaluatedCnst = 0
for i, cnt in enumerate(self):
if isinstance(cnt, TargetConstraint) and (target is not None):
cnts[i, :] = cnt(target, nslots)
elif isinstance(cnt, ScheduleConstraint):
cnts[i, :] = cnt(time, nslots)
elif isinstance(cnt, NightTimeCnst):
cnts[i, :] = cnt(sun_elevation)
elif isinstance(cnt, SolarProximityCnst):
cnts[i, :] = cnt(target, sun_position)
else:
unEvaluatedCnst += 1
if unEvaluatedCnst == self.size:
cnts += 1.
log.debug(
'No defined constraint could be used. Schedule '
'slot scores have been set to 1...'
)
score = np.average(cnts, weights=self.weights, axis=0)
self.score = np.where(
np.isnan(score),
0,
score
)
self.score[np.where(np.prod(cnts, axis=0)==0)[0]] = 0
return self.score
def evaluate_various_nslots(self, target: _Target, time: Time, test_indices: List[np.ndarray], sun_elevation=None, sun_position=None) -> List[np.ndarray]:
scores = []
# Loop over each set of indices
for indices_window in test_indices:
# Copy target and only keep the time/coordinates around the boundaries of test_indices to optimize the computations
min_idx, max_idx = np.min(indices_window), np.max(indices_window) + 1
if target is None:
sliced_target = None
else:
sliced_target = target[min_idx : max_idx] # Perform a copy without damaging the original object
sliced_time = time[min_idx : max_idx]
if sun_elevation is None:
sun_elev = None
else:
sun_elev = sun_elevation[min_idx : max_idx]
if sun_position is None:
sun_pos = None
else:
sun_pos = sun_position[min_idx : max_idx]
nslots = indices_window.size - 1
# Evaluate the constraints
n_unevaluated_constraints = 0
constraint_scores = np.zeros((self.size, nslots))
for constraint_i, constraint in enumerate(self):
constraint = copy(constraint) # to prevent overwriting initial constraints
if isinstance(constraint, TargetConstraint) and (sliced_target is not None):
constraint_scores[constraint_i, :] = constraint(sliced_target, nslots)
elif isinstance(constraint, ScheduleConstraint):
constraint_scores[constraint_i, :] = constraint(sliced_time, nslots)
elif isinstance(constraint, NightTimeCnst):
constraint_scores[constraint_i, :] = constraint(sun_elev[:-1]) # already taken at mid dt
elif isinstance(constraint, SolarProximityCnst) and (sliced_target is not None):
constraint_scores[constraint_i, :] = constraint(sliced_target, sun_pos[:-1])
else:
n_unevaluated_constraints += 1
if n_unevaluated_constraints == self.size:
constraint_scores += 1.
log.debug(
"No defined constraint could be used. Schedule "
"slot scores have been set to 1..."
)
# Compute the weighted average score
window_score = np.average(constraint_scores, weights=self.weights, axis=0)
# Replace NaN values by 0
window_score = np.where(np.isnan(window_score), 0, window_score)
# If any constraint is at 0, set the time-score at 0 no matter the other constraints
window_score[np.where(np.prod(constraint_scores, axis=0)==0)[0]] = 0
scores.append(window_score)
# return the scores
return scores
[docs]
def plot(self, **kwargs):
"""
kwargs:
figsize
figname
"""
fig = plt.figure(
figsize=kwargs.get('figsize', (10, 5))
)
ax = fig.add_subplot()
for cnt in self:
# Overplot each constraint
cnt._plot_constraint(ax=ax, **kwargs)
ax.plot(
self.score,
label="Total"
)
ax.set_xlabel("Time index")
ax.set_ylabel("Constraint score")
ax.legend()
# Save or show the figure
figname = kwargs.get("figname", "")
if figname != "":
plt.savefig(
figname,
dpi=300,
bbox_inches="tight",
transparent=True
)
log.info(f"Figure '{figname}' saved.")
else:
plt.show()
plt.close("all")
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
# ============================================================= #
# ============================================================= #