#! /usr/bin/python3
# -*- coding: utf-8 -*-
r"""
.. _schedule_constraints:
*******************************
Observation constraints classes
*******************************
In the following, an instance of :class:`~nenupy.schedule.targets.ESTarget`
is initialized for the source *Cygnus A* and stored in the variable
``target``. The method :meth:`~nenupy.schedule.targets._Target.computePosition`
is called to compute all astronomical properties at the
location of `NenuFAR <https://nenufar.obs-nancay.fr/en/astronomer/>`_
for the time-range ``times``.
.. code-block:: python
:emphasize-lines: 3,8,9
>>> from astropy.time import Time, TimeDelta
>>> import numpy as np
>>> from nenupy.schedule import ESTarget
>>> dt = TimeDelta(3600, format='sec')
>>> times = Time('2021-01-01 00:00:00') + np.arange(24)*dt
>>> target = ESTarget.fromName('Cas A')
>>> target.computePosition(times)
.. seealso::
:class:`~nenupy.schedule.targets.ESTarget` or
:class:`~nenupy.schedule.targets.ESTarget` objects are
described in :ref:`schedule_targets` in more details.
Single constraint
-----------------
There are several constraints that could be defined (see
:ref:`constraints_summary`). The simplest and probably most
basic one is the 'elevation constraint' (embedded in the
:class:`~nenupy.schedule.constraints.ElevationCnst` class) since
observing at :math:`e > 0^\circ` is a necessary requirement
for a ground-based observatory.
Once the constraint has been evaluated on ``target``,
a normalized 'score' is returned and can be plotted using the
:meth:`~nenupy.schedule.constraints.Constraint.plot` method.
.. code-block:: python
>>> from nenupy.schedule import ElevationCnst
>>> c = ElevationCnst()
>>> score = c(target)
>>> c.plot()
.. image:: ./_images/elevconstraint_casa0deg.png
:width: 800
If the required elevation to perform a good-quality observation
needs to be greater than a given value, it could be specified
to the :attr:`~nenupy.schedule.constraints.ElevationCnst.elevationMin`
attribute.
.. code-block:: python
>>> c = ElevationCnst(elevationMin=40)
>>> score = c(target)
>>> c.plot()
.. image:: ./_images/elevconstraint_casa40deg.png
:width: 800
The :class:`~nenupy.schedule.constraints.ElevationCnst`'s score is
now at :math:`0` whenever the ``target`` elevation is lower than
:attr:`~nenupy.schedule.constraints.ElevationCnst.elevationMin`.
Multiple constraints
--------------------
Several constraints are often needed to select an appropriate
time window for a given observation. The
:class:`~nenupy.schedule.constraints.Constraints` class is by
default initialized with ``ElevationCnst(elevationMin=0)``, but
any other constraint may be passed as arguments:
.. code-block:: python
>>> from nenupy.schedule import (
ESTarget,
Constraints,
ElevationCnst,
MeridianTransitCnst,
LocalTimeCnst
)
>>> from astropy.time import Time, TimeDelta
>>> from astropy.coordinates import SkyCoord, Angle
>>> dts = np.arange(24+1)*TimeDelta(3600, format='sec')
>>> times = Time('2021-01-01 00:00:00') + dts
>>> target = ESTarget.fromName('Cas A')
>>> target.computePosition(times)
>>> cnst = Constraints(
ElevationCnst(elevationMin=20, weight=3),
MeridianTransitCnst(),
LocalTimeCnst(Angle(12, 'hour'), Angle(4, 'hour'))
)
>>> cnst.evaluate(target, times)
>>> cnst.plot()
.. image:: ./_images/sch_constraints.png
:width: 800
.. _constraints_summary:
Constraint classes
------------------
.. autosummary::
~nenupy.schedule.constraints.Constraints
~nenupy.schedule.constraints.ElevationCnst
~nenupy.schedule.constraints.MeridianTransitCnst
~nenupy.schedule.constraints.AzimuthCnst
~nenupy.schedule.constraints.LocalTimeCnst
~nenupy.schedule.constraints.TimeRangeCnst
~nenupy.schedule.constraints.LocalSiderealTimeCnst
~nenupy.schedule.constraints.NightTimeCnst
.. inheritance-diagram:: nenupy.schedule.constraints
:parts: 3
"""
# TO DO : CENTRER SUR L AZIMUTH / transit au meridien
__author__ = 'Alan Loh'
__copyright__ = 'Copyright 2021, nenupy'
__credits__ = ['Alan Loh']
__maintainer__ = 'Alan'
__email__ = 'alan.loh@obspm.fr'
__status__ = 'Production'
__all__ = [
'Constraint',
'TargetConstraint',
'ScheduleConstraint',
'ElevationCnst',
'MeridianTransitCnst',
'AzimuthCnst',
'LocalSiderealTimeCnst',
'LocalTimeCnst',
'TimeRangeCnst',
'NightTimeCnst',
'Constraints'
]
from abc import ABC
import numpy as np
from astropy.time import Time, TimeDelta
from astropy.coordinates import Angle, Longitude
import pytz
import matplotlib.pyplot as plt
from copy import copy
from typing import List
from nenupy.schedule.targets import _Target
import logging
log = logging.getLogger(__name__)
# ============================================================= #
# ------------------------ Constraint ------------------------- #
# ============================================================= #
[docs]
class Constraint(ABC):
""" Base class for all the constraint definitions.
.. versionadded:: 1.2.0
"""
def __init__(self, weight=1):
self.score = 1
self.weight = weight
def __call__(self, *arg):
""" Test de docstring
"""
if not hasattr(self, '_evaluate'):
raise AttributeError(
'<Constraint> should not be used on its own.'
)
return self._evaluate(*arg)
def __str__(self):
"""
"""
return f'{self.__class__}'
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def weight(self):
"""
"""
return self._weight
@weight.setter
def weight(self, w):
if not isinstance(w, (float, int)):
raise TypeError(
'<weight> should be a number.'
)
elif w <= 0:
raise ValueError(
'<weight> should be > 0.'
)
self._weight = w
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def plot(self, **kwargs):
""" Plots the constraint's score previously evaluated.
:param figsize:
Size of the figure. Default: ``(10, 5)``.
:type figsize:
`tuple`
:param figname:
Name of the figure to be stored. Default: ``''``,
the figure is only displayed.
:type figname:
`str`
:param marker:
Plot marker type (see :func:`matplotlib.pyplot.plot`).
Default: ``'.'``.
:type marker:
`str`
:param linestyle:
Plot line style (see :func:`matplotlib.pyplot.plot`).
Default: ``':'``
:type linestyle:
`str`
:param linewidth:
Plot line width (see :func:`matplotlib.pyplot.plot`).
Default: ``1``
:type linewidth:
`int` or `float`
"""
fig = plt.figure(
figsize=kwargs.get('figsize', (10, 5))
)
self._plot_constraint(**kwargs)
plt.xlabel('Time index')
plt.ylabel('Constraint score')
plt.title(f'{self.__class__}')
# Save or show the figure
figname = kwargs.get('figname', '')
if figname != '':
plt.savefig(
figname,
dpi=300,
bbox_inches='tight',
transparent=True
)
log.info(f"Figure '{figname}' saved.")
else:
plt.show()
plt.close('all')
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _plot_constraint(self, **kwargs):
""" Internal method to plot a single constraint without
initializing the figure object.
"""
plt.plot(
np.where(
np.isnan(self.score),
0,
self.score
),
marker=kwargs.get('marker', '.'),
linestyle=kwargs.get('linestyle', ':'),
linewidth=kwargs.get('linewidth', 1),
label=f'{self.__class__}'
)
@staticmethod
def _is_numpy_instance(arr):
""" Check that arr is a genuine numpy array.
"""
if not isinstance(arr, np.ndarray):
raise TypeError(
f'{np.ndarray} object expected.'
)
# ============================================================= #
# ============================================================= #
# ============================================================= #
# --------------------- TargetConstraint ---------------------- #
# ============================================================= #
[docs]
class TargetConstraint(Constraint):
""" Base class for constraints involving target property checks.
.. warning::
:class:`~nenupy.schedule.constraints.TargetConstraint`
should not be used on its own.
.. versionadded:: 1.2.0
"""
def __init__(self, weight):
super().__init__(weight=weight)
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
@staticmethod
def _is_target_instance(target):
"""
"""
if not isinstance(target, _Target):
raise TypeError(
f'{_Target} object expected.'
)
@staticmethod
def _pass_angle_instance(angle):
"""
"""
if not isinstance(angle, Angle):
angle = Angle(angle, unit='deg')
return angle
# ============================================================= #
# ============================================================= #
# ============================================================= #
# -------------------- ScheduleConstraint --------------------- #
# ============================================================= #
[docs]
class ScheduleConstraint(Constraint):
""" Base class for constraints involving time range checks.
.. warning::
:class:`~nenupy.schedule.constraints.ScheduleConstraint`
should not be used on its own.
.. versionadded:: 1.2.0
"""
def __init__(self, weight):
super().__init__(weight=weight)
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
@staticmethod
def _is_time_instance(time):
"""
"""
if not isinstance(time, Time):
raise TypeError(
f'{Time.__class__} object expected.'
)
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ----------------------- ElevationCnst ----------------------- #
# ============================================================= #
[docs]
class ElevationCnst(TargetConstraint):
""" Elevation constraint
:param elevationMin:
Target's elevation below which the constraint score
is null. If provided as a dimensionless quantity,
the value is interpreted as degrees.
:type elevationMin:
`int`, `float`, or :class:`~astropy.coordinates.Angle`
:param weight:
Weight of the constraint. Allows to ponderate each
constraint with respect to each other if
:class:`~nenupy.schedule.constraint.ElevationCnst`
is included in :class:`~nenupy.schedule.constraint.Constraints`
for instance.
:type weight:
`int` or `float`
.. versionadded:: 1.2.0
:Example:
>>> from astropy.time import Time, TimeDelta
>>> from nenupy.schedule.targets import ESTarget
>>> from nenupy.schedule.constraints import ElevationCnst
>>> dt = TimeDelta(3600, format='sec')
>>> times = Time('2021-01-01 00:00:00') + np.arange(24)*dt
>>> cas_a = ESTarget.fromName('Cas A')
>>> cas_a.computePosition(times)
>>> elevation_constraint = ElevationCnst()
>>> score = elevation_constraint(target, None)
>>> c.plot()
"""
def __init__(self, elevationMin=0., scale_elevation=True, weight=1):
super().__init__(weight=weight)
self.elevationMin = elevationMin
self.scale_elevation = scale_elevation
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def elevationMin(self):
""" Minimal elevation required to perform an observation.
:type: `float` or :class:`~astropy.coordinates.Angle`
"""
return self._elevationMin
@elevationMin.setter
def elevationMin(self, emin):
emin = self._pass_angle_instance(emin)
if (emin.deg < 0.) or (emin.deg > 90):
raise ValueError(
f'`elevationMin`={emin.deg} deg must fall '
'between 0 and 90 degrees.'
)
self._elevationMin = emin
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def get_score(self, indices):
r""" Computes the :class:`~nenupy.schedule.constraint.ElevationCnst`'s
score for the given ``indices``.
The score is computed as:
.. math::
{\rm score} = \left\langle \frac{\mathbf{e}(t)}{{\rm max}(\mathbf{e})} \right\rangle_{\rm indices}
where :math:`\mathbf{e}(t)` is the elevation of the
target (set to :math:`0` whenever it is lower than
:attr:`~nenupy.schedule.constraints.ElevationCnst.elevationMin`).
:param indices:
Indices of :class:`~nenupy.schedule.constraint.Constraint.score`
on which the score will be evaluated.
:type indices:
:class:`~numpy.ndarray`
:returns:
Constraint score.
:rtype: `float`
"""
# aboveMin = self.score[indices] > 0
# return np.mean(self.score[indices][aboveMin])
# return np.mean(self.score[indices])
# return np.mean(
# np.where(
# np.isnan(self.score[indices]),
# 0,
# self.score[indices]
# )
# )
return np.mean(
np.where(
self.score[indices]>0.,
1,
0
)
)
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _evaluate(self, target, nslots):
""" Evaluates the constraint :class:`~nenupy.schedule.constraint.ElevationCnst`
on the ``target`` which astronomical positions need
to be computed first (using :meth:`~nenupy.schedule.targets._Target.computePosition`).
:param target:
Target for which :class:`~nenupy.schedule.constraint.ElevationCnst`
should be evaluated.
:type target: :class:`~nenupy.schedule.targets._Target`
:returns:
Constraint's score.
:rtype: :class:`~numpy.ndarray`
"""
self._is_target_instance(target)
elevation = target.elevation.deg
elevMean = (elevation[1:] + elevation[:-1])/2
# elevMean[elevMean <= self.elevationMin.deg] = 0.
elevMean[elevMean <= self.elevationMin.deg] = np.nan
# if elevMax == 0.:
if all(np.isnan(elevMean)):
log.warning(
"Constraint <ElevationConstraint(elevationMin="
f"{self.elevationMin})> evaluated for target "
f"'{target.target}' cannot be satisfied over the "
"given time range."
)
# self.score = elevation[1:]*0.
self.score = elevation[1:]*np.nan
elif not self.scale_elevation:
self.score = elevMean/elevMean
else:
elevMax = np.nanmax(elevMean)
self.score = elevMean/elevMax
return self.score
# ============================================================= #
# ============================================================= #
# ============================================================= #
# -------------------- MeridianTransitCnst -------------------- #
# ============================================================= #
[docs]
class MeridianTransitCnst(TargetConstraint):
""" Meridian Transit constraint
.. versionadded:: 1.2.0
"""
def __init__(self, weight=1):
super().__init__(weight=weight)
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def get_score(self, indices):
r""" Computes the :class:`~nenupy.schedule.constraint.MeridianTransitCnst`'s
score for the given ``indices``.
Returns 1 if the merdian transit is within the indices
The score is computed as:
.. math::
{\rm score} = \begin{cases}
1, t_{\rm transit} \in \mathbf{t}({\rm indices})\\
0, t_{\rm transit} \notin \mathbf{t}({\rm indices})
\end{cases}
where :math:`t_{\rm transit}` is the meridian transit time
and :math:`\mathbf{t}` is the time range on which the
target positions are computed.
:param indices:
Indices of :class:`~nenupy.schedule.constraint.Constraint.score`
on which the score will be evaluated.
:type indices:
:class:`~numpy.ndarray`
:returns:
Constraint score.
:rtype: `float`
"""
self._is_numpy_instance(indices)
#return np.sum(self.score[indices], axis=-1)
return int((self.score[indices]>0.7).any())
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _evaluate(self, target, nslots):
self._is_target_instance(target)
hourAngle = target.hourAngle
transitIdx = np.where(
(np.roll(hourAngle, -1) - hourAngle)[:-1] < 0
)[0]
scores = np.zeros(hourAngle.size)
# Set neighbor slots to non-zero to maximize centering
slotShifts = np.arange(nslots) - int(np.floor(nslots/2))
neighborIdx = (transitIdx[:, None] + slotShifts[None, :]).ravel()
outOfBounds = (neighborIdx < 0) + (neighborIdx >= scores.size)
neighborIdx = np.delete(
neighborIdx,
np.argwhere(outOfBounds)
)
scores = np.where(
np.isin(
np.arange(scores.size, dtype=int),
neighborIdx
),
0.5,
scores
)
# Set transit slots to maximal score
scores[transitIdx] = 1.
self.score = scores[:-1]
return self.score
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ------------------------ AzimuthCnst ------------------------ #
# ============================================================= #
[docs]
class AzimuthCnst(TargetConstraint):
"""
.. versionadded:: 1.2.0
"""
def __init__(self, azimuth, weight=1):
super().__init__(weight=weight)
self.azimuth = azimuth
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def azimuth(self):
"""
"""
return self._azimuth
@azimuth.setter
def azimuth(self, az):
az = self._pass_angle_instance(az)
if (az.deg < 0.) or (az.deg > 360):
raise ValueError(
f'`azimuth`={az.deg} deg must fall '
'between 0 and 360 degrees.'
)
self._azimuth = az
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def get_score(self, indices):
"""
"""
self._is_numpy_instance(indices)
# return np.sum(self.score[indices], axis=-1)
return int((self.score[indices]>0.7).any())
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _evaluate(self, target, nslots):
self._is_target_instance(target)
azimuths = target.azimuth.rad
az = self.azimuth.rad
if target.isCircumpolar:
az = np.angle(np.cos(az) + 1j*np.sin(az))
complexAzStarts = np.angle(
np.cos(azimuths[:-1]) + 1j*np.sin(azimuths[:-1])
)
complexAzStops = np.angle(
np.cos(azimuths[1:]) + 1j*np.sin(azimuths[1:])
)
mask = (az >= complexAzStarts) &\
(az <= complexAzStops)
mask |= (az <= complexAzStarts) &\
(az >= complexAzStops)
else:
mask = (az >= azimuths[:-1]) &\
(az <= azimuths[1:])
scores = np.zeros(mask.size)
azIndices = np.where(mask)[0]
# Set neighbor slots to non-zero to maximize centering
slotShifts = np.arange(nslots) - int(np.floor(nslots/2))
if nslots%2 == 0:
# if nslots is even, we have to think which slot to include in addition to the max one.
# print(np.abs(azimuths[azIndices] - az))
pass
neighborIdx = (azIndices[:, None] + slotShifts[None, :]).ravel()
outOfBounds = (neighborIdx < 0) + (neighborIdx >= scores.size)
neighborIdx = np.delete(
neighborIdx,
np.argwhere(outOfBounds)
)
scores = np.where(
np.isin(
np.arange(scores.size, dtype=int),
neighborIdx
),
0.5,
scores
)
# Set azimuth found slots to maximal score
scores[azIndices] = 1.
self.score = scores
# self.score = mask.astype(float)
return self.score
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ------------------- LocalSiderealTimeCnst ------------------- #
# ============================================================= #
[docs]
class LocalSiderealTimeCnst(TargetConstraint):
def __init__(self, lst: Longitude, strict_crossing: bool = True, weight: float = 1):
super().__init__(weight=weight)
self.lst = lst
self.strict_crossing = strict_crossing
# --------------------------------------------------------- #
# ------------------------ Methods ------------------b------ #
[docs]
def get_score(self, indices):
"""
"""
self._is_numpy_instance(indices)
# return np.mean(self.score[indices], axis=-1)
return int((self.score[indices]>0.7).any())
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _evaluate(self, target, nslots):
""" time: dim + 1
"""
self._is_target_instance(target)
pi_lon = Longitude(180, unit='deg')
lst_starts = copy(target._lst[:-1])
lst_stops = copy(target._lst[1:])
lst_desired = np.repeat(self.lst, lst_starts.size)
# Deal with crossing 360/0 case
tricky_mask = lst_starts > lst_stops
lst_starts[tricky_mask] = (lst_starts[tricky_mask] - pi_lon).wrap_at('360d')
lst_stops[tricky_mask] = (lst_stops[tricky_mask] - pi_lon).wrap_at('360d')
lst_desired[tricky_mask] = (lst_desired[tricky_mask] - pi_lon).wrap_at('360d')
mask = (lst_desired >= lst_starts) & (lst_desired <= lst_stops)
# Compute the score
scores = np.zeros(mask.size)
lst_crossing_indices = np.where(mask)[0]
if self.strict_crossing:
# Set neighbor slots to non-zero to maximize centering
slot_shifts = np.arange(nslots) - int(np.floor(nslots/2))
if nslots%2 == 0:
# if nslots is even, we have to think which slot to include in addition to the max one.
pass
neighbor_idx = (lst_crossing_indices[:, None] + slot_shifts[None, :]).ravel()
outOfBounds = (neighbor_idx < 0) + (neighbor_idx >= scores.size)
neighbor_idx = np.delete(
neighbor_idx,
np.argwhere(outOfBounds)
)
scores = np.where(
np.isin(
np.arange(scores.size, dtype=int),
neighbor_idx
),
0.5,
scores
)
# Set azimuth found slots to maximal score
scores[lst_crossing_indices] = 1.
else:
# Number of consecutive false in the array
scores[lst_crossing_indices] = 1
bool_array = ~scores.astype(bool)
consecutive_falses = np.diff(
np.where(
np.concatenate(
([bool_array[0]],
bool_array[:-1] != bool_array[1:],
[True])
)
)[0]
)[::2].max()
kernel_size = consecutive_falses + 1 if consecutive_falses%2==0 else consecutive_falses
kernel = np.arange(kernel_size) - int(np.floor(kernel_size/2))
kernel = 1. - np.abs(kernel)/kernel.max()
scores = np.convolve(scores, kernel, mode='same')
self.score = scores
# self.score = mask.astype(float)
return self.score
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ----------------------- LocalTimeCnst ----------------------- #
# ============================================================= #
[docs]
class LocalTimeCnst(ScheduleConstraint):
"""
.. versionadded:: 1.2.0
"""
def __init__(self, hMin, hMax, weight=1):
super().__init__(weight=weight)
self.hMin = hMin
self.hMax = hMax
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def hMin(self):
"""
"""
return self._hMin
@hMin.setter
def hMin(self, h):
if not isinstance(h, Angle):
raise TypeError(
f'{h} should be of type {type(Angle)}.'
)
self._hMin = h
@property
def hMax(self):
"""
"""
return self._hMax
@hMax.setter
def hMax(self, h):
if not isinstance(h, Angle):
raise TypeError(
f'{h} should be of type {type(Angle)}.'
)
self._hMax = h
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def get_score(self, indices):
"""
"""
self._is_numpy_instance(indices)
return np.mean(self.score[indices], axis=-1)
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _evaluate(self, time, nslots):
"""
"""
self._is_time_instance(time)
# Convert time to France local time (take into account
# daylight savings)
tz = pytz.timezone('Europe/Paris')
timezoneTime = map(tz.localize, time.datetime)
utcOffset = np.array(
[tt.utcoffset().total_seconds() for tt in timezoneTime]
)
localTime = time + TimeDelta(utcOffset, format='sec')
# Convert the 'hour' part in decimal 'angle' values
hours = np.array([tt.split()[1] for tt in localTime.iso])
localHours = Angle(hours, unit='hour').hour
# Selection
if self.hMin > self.hMax:
# If 'midnight' is in the range
mask = (localHours <= self.hMin.hour) &\
(localHours >= self.hMax.hour)
mask = ~mask
else:
mask = (localHours >= self.hMin.hour) &\
(localHours <= self.hMax.hour)
score = mask[:-1].astype(float)
self.score = np.where(score==0, np.nan, score)
return self.score
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ----------------------- TimeRangeCnst ----------------------- #
# ============================================================= #
[docs]
class TimeRangeCnst(ScheduleConstraint):
"""
.. versionadded:: 1.2.0
"""
def __init__(self, time_min, time_max, weight=1):
super().__init__(weight=weight)
self.time_min = time_min
self.time_max = time_max
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def time_min(self):
"""
"""
return self._time_min
@time_min.setter
def time_min(self, t):
if not isinstance(t, Time):
raise TypeError(
f'{t} should be of type {type(Time)}.'
)
self._time_min = t
@property
def time_max(self):
"""
"""
return self._time_max
@time_max.setter
def time_max(self, t):
if not isinstance(t, Time):
raise TypeError(
f'{t} should be of type {type(Time)}.'
)
self._time_max = t
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def get_score(self, indices):
"""
"""
self._is_numpy_instance(indices)
return np.mean(self.score[indices], axis=-1)
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _evaluate(self, time, nslots):
""" time: dim + 1
"""
self._is_time_instance(time)
jds = time.jd
if self.time_min.isscalar:
mask = (jds >= self.time_min.jd) & (jds <= self.time_max.jd)
else:
mask = np.sum(
(jds[:, None] >= self.time_min.jd) & (jds[:, None] <= self.time_max.jd),
axis=1,
dtype=bool
)
score = mask[:-1].astype(float)
self.score = np.where(score==0, np.nan, score)
return self.score
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ----------------------- NightTimeCnst ----------------------- #
# ============================================================= #
[docs]
class NightTimeCnst(Constraint):
""" """
def __init__(self, sun_elevation_max=0., scale_elevation=True, weight=1):
super().__init__(weight=weight)
self.sun_elevation_max = sun_elevation_max
self.scale_elevation = scale_elevation
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def get_score(self, indices):
""" """
return np.mean(
np.where(
self.score[indices]>0.,
1,
0
)
)
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _evaluate(self, sun_elevation):
"""
"""
sun_elevation[sun_elevation > self.sun_elevation_max] = np.nan
if all(np.isnan(sun_elevation)):
log.warning(
"Constraint <NightTimeCnst(sun_elevation_max="
f"{self.sun_elevation_max})> cannot be satisfied"
"over the given time range."
)
self.score = sun_elevation[1:]*np.nan
elif not self.scale_elevation:
self.score = sun_elevation/sun_elevation
else:
elevation_min = np.nanmin(sun_elevation)
self.score = sun_elevation/elevation_min
return self.score
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ------------------------ Constraints ------------------------ #
# ============================================================= #
[docs]
class Constraints(object):
"""
.. versionadded:: 1.2.0
"""
def __init__(self, *constraints):
self._default_el = False
self.constraints = constraints
self.score = None
# Check they are all of unique type
unique, count = np.unique(
[str(cons.__class__) for cons in self.constraints],
return_counts=True
)
if any(count > 1):
message = (
'There can only be one constraint type per '
'observation:'
)
for typ, n in zip(unique, count):
if n > 1:
message += f"\n\t* '{typ}': {n} instances."
raise ValueError(message)
# Add the elevation constraint by default
#if not str(ElevationCnst) in unique:
if not np.any(np.isin(np.array([str(ElevationCnst)]), unique)):
self.constraints += (ElevationCnst(0.),)
self._default_el = True
def __add__(self, other):
"""
"""
if not isinstance(other, Constraint):
raise TypeError('')
if isinstance(other, ElevationCnst):
if self._default_el:
# Remove the default elevation constraint
# if a new one is added
cs = np.array([str(c.__class__) for c in self.constraints])
elCnst_idx = np.where(cs == str(ElevationCnst))[0][0]
listCnst = list(self.constraints)
listCnst.pop(elCnst_idx)
self.constraints = tuple(listCnst)
self._default_el = False
constraints = self.constraints + (other,)
cts = Constraints(*constraints)
cts._default_el = self._default_el
return cts
def __getitem__(self, n):
"""
"""
return self.constraints[n]
def __len__(self):
"""
"""
return len(self.constraints)
def __del__(self):
for constraint in self.constraints:
del constraint
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def size(self):
"""
"""
return len(self)
@property
def weights(self):
"""
"""
return np.array([cnt.weight for cnt in self])
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def evaluate(self, target, time, nslots=1, sun_elevation=None):
"""
"""
cnts = np.zeros((self.size, time.size - 1))
unEvaluatedCnst = 0
for i, cnt in enumerate(self):
if isinstance(cnt, TargetConstraint) and (target is not None):
cnts[i, :] = cnt(target, nslots)
elif isinstance(cnt, ScheduleConstraint):
cnts[i, :] = cnt(tuple(time), nslots)
elif isinstance(cnt, NightTimeCnst):
cnts[i, :] = cnt(sun_elevation)
else:
unEvaluatedCnst += 1
if unEvaluatedCnst == self.size:
cnts += 1.
log.debug(
'No defined constraint could be used. Schedule '
'slot scores have been set to 1...'
)
score = np.average(cnts, weights=self.weights, axis=0)
self.score = np.where(
np.isnan(score),
0,
score
)
self.score[np.where(np.prod(cnts, axis=0)==0)[0]] = 0
return self.score
[docs]
def evaluate_various_nslots(self, target: _Target, time: Time, test_indices: List[np.ndarray], sun_elevation=None) -> List[np.ndarray]:
scores = []
# Loop over each set of indices
for indices_window in test_indices:
# Copy target and only keep the time/coordinates around the boundaries of test_indices to optimize the computations
min_idx, max_idx = np.min(indices_window), np.max(indices_window) + 1
sliced_target = None if target is None else target[min_idx : max_idx] # Perform a copy without damaging the original object
time = time[min_idx : max_idx]
sun_elevation = None if sun_elevation is None else sun_elevation[min_idx : max_idx]
nslots = indices_window.size - 1
# Evaluate the constraints
n_unevaluated_constraints = 0
constraint_scores = np.zeros((self.size, nslots))
for constraint_i, constraint in enumerate(self):
constraint = copy(constraint) # to prevent overwriting initial constraints
if isinstance(constraint, TargetConstraint) and (sliced_target is not None):
constraint_scores[constraint_i, :] = constraint(sliced_target, nslots)
elif isinstance(constraint, ScheduleConstraint):
constraint_scores[constraint_i, :] = constraint(tuple(time), nslots)
elif isinstance(constraint, NightTimeCnst):
constraint_scores[constraint_i, :] = constraint(sun_elevation)
else:
n_unevaluated_constraints += 1
if n_unevaluated_constraints == self.size:
constraint_scores += 1.
log.debug(
"No defined constraint could be used. Schedule "
"slot scores have been set to 1..."
)
# Compute the weighted average score
window_score = np.average(constraint_scores, weights=self.weights, axis=0)
# Replace NaN values by 0
window_score = np.where(np.isnan(window_score), 0, window_score)
# If any constraint is at 0, set the time-score at 0 no matter the other constraints
window_score[np.where(np.prod(constraint_scores, axis=0)==0)[0]] = 0
scores.append(window_score)
# return the scores
return scores
[docs]
def plot(self, **kwargs):
"""
kwargs:
figsize
figname
"""
fig = plt.figure(
figsize=kwargs.get('figsize', (10, 5))
)
for cnt in self:
# Overplot each constraint
cnt._plot_constraint(**kwargs)
plt.plot(
self.score,
label='Total'
)
plt.xlabel('Time index')
plt.ylabel('Constraint score')
plt.legend()
# Save or show the figure
figname = kwargs.get('figname', '')
if figname != '':
plt.savefig(
figname,
dpi=300,
bbox_inches='tight',
transparent=True
)
log.info(f"Figure '{figname}' saved.")
else:
plt.show()
plt.close('all')
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
# ============================================================= #
# ============================================================= #