Source code for nenupy.schedule.constraints

#! /usr/bin/python3
# -*- coding: utf-8 -*-


r"""
    .. _schedule_constraints:

    *******************************
    Observation constraints classes
    *******************************

    
    In the following, an instance of :class:`~nenupy.schedule.targets.ESTarget`
    is initialized for the source *Cygnus A* and stored in the variable
    ``target``. The method :meth:`~nenupy.schedule.targets._Target.computePosition`
    is called to compute all astronomical properties at the
    location of `NenuFAR <https://nenufar.obs-nancay.fr/en/astronomer/>`_
    for the time-range ``times``.

    .. code-block:: python
        :emphasize-lines: 3,8,9

        >>> from astropy.time import Time, TimeDelta
        >>> import numpy as np
        >>> from nenupy.schedule import ESTarget

        >>> dt = TimeDelta(3600, format='sec')
        >>> times = Time('2021-01-01 00:00:00') + np.arange(24)*dt

        >>> target = ESTarget.fromName('Cas A')
        >>> target.computePosition(times)


    .. seealso::
        :class:`~nenupy.schedule.targets.ESTarget` or 
        :class:`~nenupy.schedule.targets.ESTarget` objects are
        described in :ref:`schedule_targets` in more details.


    Single constraint
    -----------------

    There are several constraints that could be defined.
    The simplest and probably most
    basic one is the 'elevation constraint' (embedded in the 
    :class:`~nenupy.schedule.constraints.ElevationCnst` class) since
    observing at :math:`e > 0^\circ` is a necessary requirement
    for a ground-based observatory.
    Once the constraint has been evaluated on ``target``,
    a normalized 'score' is returned and can be plotted using the
    :meth:`~nenupy.schedule.constraints.Constraint.plot` method.
    
    .. code-block:: python

        >>> from nenupy.schedule import ElevationCnst

        >>> c = ElevationCnst()
        >>> score = c(target)
        >>> c.plot()


    .. image:: ../_images/elevconstraint_casa0deg.png
        :width: 800

    If the required elevation to perform a good-quality observation
    needs to be greater than a given value, it could be specified
    to the :attr:`~nenupy.schedule.constraints.ElevationCnst.elevationMin`
    attribute.

    .. code-block:: python

        >>> c = ElevationCnst(elevationMin=40)
        >>> score = c(target)
        >>> c.plot()


    .. image:: ../_images/elevconstraint_casa40deg.png
        :width: 800

    The :class:`~nenupy.schedule.constraints.ElevationCnst`'s score is
    now at :math:`0` whenever the ``target`` elevation is lower than
    :attr:`~nenupy.schedule.constraints.ElevationCnst.elevationMin`.


    Multiple constraints
    --------------------
    
    Several constraints are often needed to select an appropriate
    time window for a given observation. The
    :class:`~nenupy.schedule.constraints.Constraints` class is by
    default initialized with ``ElevationCnst(elevationMin=0)``, but
    any other constraint may be passed as arguments:
    
    .. code-block:: python

        >>> from nenupy.schedule import (
                ESTarget,
                Constraints,
                ElevationCnst,
                MeridianTransitCnst,
                LocalTimeCnst
            )
        >>> from astropy.time import Time, TimeDelta
        >>> from astropy.coordinates import SkyCoord, Angle

        >>> dts = np.arange(24+1)*TimeDelta(3600, format='sec')
        >>> times = Time('2021-01-01 00:00:00') + dts
        >>> target = ESTarget.fromName('Cas A')
        >>> target.computePosition(times)

        >>> cnst = Constraints(
                ElevationCnst(elevationMin=20, weight=3),
                MeridianTransitCnst(),
                LocalTimeCnst(Angle(12, 'hour'), Angle(4, 'hour'))
            )
        
        >>> cnst.evaluate(target, times)
        
        >>> cnst.plot()

    .. image:: ../_images/sch_constraints.png
        :width: 800

    .. inheritance-diagram:: nenupy.schedule.constraints
        :parts: 3

"""

# TO DO : CENTRER SUR L AZIMUTH / transit au meridien

__author__ = 'Alan Loh'
__copyright__ = 'Copyright 2021, nenupy'
__credits__ = ['Alan Loh']
__maintainer__ = 'Alan'
__email__ = 'alan.loh@obspm.fr'
__status__ = 'Production'
__all__ = [
    'Constraint',
    'TargetConstraint',
    'ScheduleConstraint',
    'ElevationCnst',
    'MeridianTransitCnst',
    'AzimuthCnst',
    'LocalSiderealTimeCnst',
    'LocalTimeCnst',
    "UTCHourCnst",
    'TimeRangeCnst',
    'NightTimeCnst',
    'PeriodicCnst',
    'Constraints'
]


from abc import ABC
import numpy as np
from astropy.time import Time, TimeDelta
from astropy.coordinates import Angle, Longitude
import pytz
import matplotlib.pyplot as plt
import matplotlib as mpl
from copy import copy
from typing import List, Tuple, Union

from nenupy.schedule.targets import _Target, SSTarget

import logging
log = logging.getLogger(__name__)


# ============================================================= #
# ------------------------ Constraint ------------------------- #
# ============================================================= #
[docs] class Constraint(ABC): """ Base class for all the constraint definitions. .. versionadded:: 1.2.0 """
[docs] def __init__(self, weight=1): self.score = 1 self.weight = weight
[docs] def __call__(self, *arg) -> np.ndarray: """Evaluate the constraint against the schedule properties. Returns ------- :class:`~numpyp.ndarray` The constraint score evaluated at each time step. Raises ------ AttributeError If the constraint does not have an `_evaluate()` method. """ if not hasattr(self, "_evaluate"): raise AttributeError( "<Constraint> should not be used on its own." ) return self._evaluate(*arg)
def __str__(self): return f'{self.__class__}' # --------------------------------------------------------- # # --------------------- Getter/Setter --------------------- # @property def weight(self) -> Union[int, float]: """ Relative weight associated to the constraint. The constraint score of an observation block within a given schedule is ultimately taken at the weighted mean among all the constraints associated to this observaiton request. """ return self._weight @weight.setter def weight(self, w: Union[int, float]): if not isinstance(w, (float, int)): raise TypeError( "<weight> should be a number." ) elif w <= 0: raise ValueError( "<weight> should be > 0." ) self._weight = w # --------------------------------------------------------- # # ------------------------ Methods ------------------------ #
[docs] def plot(self, fig: mpl.figure.Figure = None, ax: mpl.axes.Axes = None, **kwargs) -> None: """ Plot the constraint's score previously evaluated. Parameters ---------- fig : :class:`~matplotlib.figure.Figure` Figure instance to re-use for plotting purposes, by default `None` (i.e., a new :class:`~matplotlib.figure.Figure` will be created) ax : :class:`~matplotlib.axes.Axes` Ax instance to re-use for plotting purposes, by default `None` (i.e., a new :class:`~matplotlib.axes.Axes` will be created) Other Parameters ---------------- figsize : [`float`, `float`] Size of the figure, by default ``(10, 5)`` figname : `str` Name of the figure to be saved, by default ``''`` (i.e., the figure is only displayed) marker : `str` Plot marker type (see :func:`matplotlib.pyplot.plot`), by default ``'.'`` linestyle : `str` Plot line style (see :func:`matplotlib.pyplot.plot`), by default ``':'`` linewidth: `int` Plot line width (see :func:`matplotlib.pyplot.plot`), by default ``1`` """ fig = plt.figure( figsize=kwargs.get("figsize", (10, 5)) ) ax = fig.add_subplot() self._plot_constraint(ax=ax, **kwargs) ax.set_xlabel("Time index") ax.set_ylabel("Constraint score") ax.set_title(f"{self}") # Save or show the figure figname = kwargs.get("figname", "") if figname != "": fig.savefig( figname, dpi=300, bbox_inches="tight", transparent=True ) log.info(f"Figure '{figname}' saved.") else: plt.show() plt.close("all")
# --------------------------------------------------------- # # ----------------------- Internal ------------------------ # def _plot_constraint(self, ax: mpl.axes.Axes,**kwargs) -> None: """ Internal method to plot a single constraint without initializing the figure object. """ ax.plot( np.where( np.isnan(self.score), 0, self.score ), marker=kwargs.get("marker", "."), linestyle=kwargs.get("linestyle", ":"), linewidth=kwargs.get("linewidth", 1), label=f"{self.__class__}" ) @staticmethod def _is_numpy_instance(arr): """ Check that arr is a genuine numpy array. """ if not isinstance(arr, np.ndarray): raise TypeError( f'{np.ndarray} object expected.' )
# ============================================================= # # ============================================================= # # ============================================================= # # --------------------- TargetConstraint ---------------------- # # ============================================================= #
[docs] class TargetConstraint(Constraint): """ Base class for constraints involving target property checks. .. warning:: :class:`~nenupy.schedule.constraints.TargetConstraint` should not be used on its own. .. versionadded:: 1.2.0 """
[docs] def __init__(self, weight): super().__init__(weight=weight)
# --------------------------------------------------------- # # ----------------------- Internal ------------------------ # @staticmethod def _is_target_instance(target): """ """ if not isinstance(target, _Target): raise TypeError( f'{_Target} object expected.' ) @staticmethod def _pass_angle_instance(angle): """ """ if not isinstance(angle, Angle): try: angle = Angle(angle, unit="deg") except: raise Exception("An angle is expected.") return angle
# ============================================================= # # ============================================================= # # ============================================================= # # -------------------- ScheduleConstraint --------------------- # # ============================================================= #
[docs] class ScheduleConstraint(Constraint): """ Base class for constraints involving time range checks. .. warning:: :class:`~nenupy.schedule.constraints.ScheduleConstraint` should not be used on its own. .. versionadded:: 1.2.0 """
[docs] def __init__(self, weight): super().__init__(weight=weight)
# --------------------------------------------------------- # # ----------------------- Internal ------------------------ # @staticmethod def _is_time_instance(time): """ """ if not isinstance(time, Time): raise TypeError( f'{Time.__class__} object expected.' )
# ============================================================= # # ============================================================= # # ============================================================= # # ----------------------- ElevationCnst ----------------------- # # ============================================================= #
[docs] class ElevationCnst(TargetConstraint): """ Elevation constraint :param elevationMin: Target's elevation below which the constraint score is null. If provided as a dimensionless quantity, the value is interpreted as degrees. :type elevationMin: `int`, `float`, or :class:`~astropy.coordinates.Angle` :param weight: Weight of the constraint. Allows to ponderate each constraint with respect to each other if :class:`~nenupy.schedule.constraint.ElevationCnst` is included in :class:`~nenupy.schedule.constraint.Constraints` for instance. :type weight: `int` or `float` .. versionadded:: 1.2.0 :Example: >>> from astropy.time import Time, TimeDelta >>> from nenupy.schedule.targets import ESTarget >>> from nenupy.schedule.constraints import ElevationCnst >>> dt = TimeDelta(3600, format='sec') >>> times = Time('2021-01-01 00:00:00') + np.arange(24)*dt >>> cas_a = ESTarget.fromName('Cas A') >>> cas_a.computePosition(times) >>> elevation_constraint = ElevationCnst() >>> score = elevation_constraint(target, None) >>> c.plot() """
[docs] def __init__(self, elevationMin=0., scale_elevation=True, weight=1): super().__init__(weight=weight) self.elevationMin = elevationMin self.scale_elevation = scale_elevation
# --------------------------------------------------------- # # --------------------- Getter/Setter --------------------- # @property def elevationMin(self): """ Minimal elevation required to perform an observation. :type: `float` or :class:`~astropy.coordinates.Angle` """ return self._elevationMin @elevationMin.setter def elevationMin(self, emin): emin = self._pass_angle_instance(emin) if (emin.deg < 0.) or (emin.deg > 90): raise ValueError( f'`elevationMin`={emin.deg} deg must fall ' 'between 0 and 90 degrees.' ) self._elevationMin = emin # --------------------------------------------------------- # # ------------------------ Methods ------------------------ #
[docs] def get_score(self, indices): r""" Computes the :class:`~nenupy.schedule.constraint.ElevationCnst`'s score for the given ``indices``. The score is computed as: .. math:: {\rm score} = \left\langle \frac{\mathbf{e}(t)}{{\rm max}(\mathbf{e})} \right\rangle_{\rm indices} where :math:`\mathbf{e}(t)` is the elevation of the target (set to :math:`0` whenever it is lower than :attr:`~nenupy.schedule.constraints.ElevationCnst.elevationMin`). :param indices: Indices of :class:`~nenupy.schedule.constraint.Constraint.score` on which the score will be evaluated. :type indices: :class:`~numpy.ndarray` :returns: Constraint score. :rtype: `float` """ # aboveMin = self.score[indices] > 0 # return np.mean(self.score[indices][aboveMin]) # return np.mean(self.score[indices]) # return np.mean( # np.where( # np.isnan(self.score[indices]), # 0, # self.score[indices] # ) # ) return np.mean( np.where( self.score[indices]>0., 1, 0 ) )
# --------------------------------------------------------- # # ----------------------- Internal ------------------------ # def _evaluate(self, target, nslots = None): """ Evaluates the constraint :class:`~nenupy.schedule.constraint.ElevationCnst` on the ``target`` which astronomical positions need to be computed first (using :meth:`~nenupy.schedule.targets._Target.computePosition`). :param target: Target for which :class:`~nenupy.schedule.constraint.ElevationCnst` should be evaluated. :type target: :class:`~nenupy.schedule.targets._Target` :returns: Constraint's score. :rtype: :class:`~numpy.ndarray` """ self._is_target_instance(target) elevation = target.elevation.deg elevMean = (elevation[1:] + elevation[:-1])/2 # elevMean[elevMean <= self.elevationMin.deg] = 0. elevMean[elevMean <= self.elevationMin.deg] = np.nan # if elevMax == 0.: if all(np.isnan(elevMean)): log.warning( "Constraint <ElevationConstraint(elevationMin=" f"{self.elevationMin})> evaluated for target " f"'{target.target}' cannot be satisfied over the " "given time range." ) # self.score = elevation[1:]*0. self.score = elevation[1:] * np.nan elif not self.scale_elevation: self.score = elevMean/elevMean else: elevMax = np.nanmax(elevMean) self.score = elevMean/elevMax return self.score
# ============================================================= # # ============================================================= # # ============================================================= # # -------------------- MeridianTransitCnst -------------------- # # ============================================================= #
[docs] class MeridianTransitCnst(TargetConstraint): """ Meridian Transit constraint .. versionadded:: 1.2.0 """
[docs] def __init__(self, weight=1): super().__init__(weight=weight)
# --------------------------------------------------------- # # ------------------------ Methods ------------------------ #
[docs] def get_score(self, indices): r""" Computes the :class:`~nenupy.schedule.constraint.MeridianTransitCnst`'s score for the given ``indices``. Returns 1 if the merdian transit is within the indices The score is computed as: .. math:: {\rm score} = \begin{cases} 1, t_{\rm transit} \in \mathbf{t}({\rm indices})\\ 0, t_{\rm transit} \notin \mathbf{t}({\rm indices}) \end{cases} where :math:`t_{\rm transit}` is the meridian transit time and :math:`\mathbf{t}` is the time range on which the target positions are computed. :param indices: Indices of :class:`~nenupy.schedule.constraint.Constraint.score` on which the score will be evaluated. :type indices: :class:`~numpy.ndarray` :returns: Constraint score. :rtype: `float` """ self._is_numpy_instance(indices) #return np.sum(self.score[indices], axis=-1) return int((self.score[indices]>0.7).any())
# --------------------------------------------------------- # # ----------------------- Internal ------------------------ # def _evaluate(self, target, nslots): self._is_target_instance(target) hourAngle = target.hourAngle transitIdx = np.where( (np.roll(hourAngle, -1) - hourAngle)[:-1] < 0 )[0] scores = np.zeros(hourAngle.size) # Set neighbor slots to non-zero to maximize centering slotShifts = np.arange(nslots) - int(np.floor(nslots/2)) neighborIdx = (transitIdx[:, None] + slotShifts[None, :]).ravel() outOfBounds = (neighborIdx < 0) + (neighborIdx >= scores.size) neighborIdx = np.delete( neighborIdx, np.argwhere(outOfBounds) ) scores = np.where( np.isin( np.arange(scores.size, dtype=int), neighborIdx ), 0.5, scores ) # Set transit slots to maximal score scores[transitIdx] = 1. self.score = scores[:-1] return self.score
# ============================================================= # # ============================================================= # # ============================================================= # # ------------------------ AzimuthCnst ------------------------ # # ============================================================= #
[docs] class AzimuthCnst(TargetConstraint): """ .. versionadded:: 1.2.0 """
[docs] def __init__(self, azimuth, weight=1): super().__init__(weight=weight) self.azimuth = azimuth
# --------------------------------------------------------- # # --------------------- Getter/Setter --------------------- # @property def azimuth(self): """ """ return self._azimuth @azimuth.setter def azimuth(self, az): az = self._pass_angle_instance(az) if (az.deg < 0.) or (az.deg > 360): raise ValueError( f'`azimuth`={az.deg} deg must fall ' 'between 0 and 360 degrees.' ) self._azimuth = az # --------------------------------------------------------- # # ------------------------ Methods ------------------------ #
[docs] def get_score(self, indices): """ """ self._is_numpy_instance(indices) # return np.sum(self.score[indices], axis=-1) return int((self.score[indices]>0.7).any())
# --------------------------------------------------------- # # ----------------------- Internal ------------------------ # def _evaluate(self, target, nslots): self._is_target_instance(target) azimuths = target.azimuth.rad az = self.azimuth.rad if target.isCircumpolar: az = np.angle(np.cos(az) + 1j*np.sin(az)) complexAzStarts = np.angle( np.cos(azimuths[:-1]) + 1j*np.sin(azimuths[:-1]) ) complexAzStops = np.angle( np.cos(azimuths[1:]) + 1j*np.sin(azimuths[1:]) ) mask = (az >= complexAzStarts) &\ (az <= complexAzStops) mask |= (az <= complexAzStarts) &\ (az >= complexAzStops) else: mask = (az >= azimuths[:-1]) &\ (az <= azimuths[1:]) scores = np.zeros(mask.size) azIndices = np.where(mask)[0] # Set neighbor slots to non-zero to maximize centering slotShifts = np.arange(nslots) - int(np.floor(nslots/2)) if nslots%2 == 0: # if nslots is even, we have to think which slot to include in addition to the max one. # print(np.abs(azimuths[azIndices] - az)) pass neighborIdx = (azIndices[:, None] + slotShifts[None, :]).ravel() outOfBounds = (neighborIdx < 0) + (neighborIdx >= scores.size) neighborIdx = np.delete( neighborIdx, np.argwhere(outOfBounds) ) scores = np.where( np.isin( np.arange(scores.size, dtype=int), neighborIdx ), 0.5, scores ) # Set azimuth found slots to maximal score scores[azIndices] = 1. self.score = scores # self.score = mask.astype(float) return self.score
# ============================================================= # # ============================================================= # # ============================================================= # # ------------------- LocalSiderealTimeCnst ------------------- # # ============================================================= #
[docs] class LocalSiderealTimeCnst(TargetConstraint):
[docs] def __init__(self, lst: Longitude, strict_crossing: bool = True, weight: float = 1): super().__init__(weight=weight) self.lst = lst self.strict_crossing = strict_crossing
# --------------------------------------------------------- # # ------------------------ Methods ------------------b------ #
[docs] def get_score(self, indices): """ """ self._is_numpy_instance(indices) # return np.mean(self.score[indices], axis=-1) return int((self.score[indices]>0.7).any())
# --------------------------------------------------------- # # ----------------------- Internal ------------------------ # def _evaluate(self, target, nslots): """ time: dim + 1 """ self._is_target_instance(target) pi_lon = Longitude(180, unit='deg') lst_starts = copy(target._lst[:-1]) lst_stops = copy(target._lst[1:]) lst_desired = np.repeat(self.lst, lst_starts.size) # Deal with crossing 360/0 case tricky_mask = lst_starts > lst_stops lst_starts[tricky_mask] = (lst_starts[tricky_mask] - pi_lon).wrap_at('360d') lst_stops[tricky_mask] = (lst_stops[tricky_mask] - pi_lon).wrap_at('360d') lst_desired[tricky_mask] = (lst_desired[tricky_mask] - pi_lon).wrap_at('360d') mask = (lst_desired >= lst_starts) & (lst_desired <= lst_stops) # Compute the score scores = np.zeros(mask.size) lst_crossing_indices = np.where(mask)[0] if self.strict_crossing: # Set neighbor slots to non-zero to maximize centering slot_shifts = np.arange(nslots) - int(np.floor(nslots/2)) if nslots%2 == 0: # if nslots is even, we have to think which slot to include in addition to the max one. pass neighbor_idx = (lst_crossing_indices[:, None] + slot_shifts[None, :]).ravel() outOfBounds = (neighbor_idx < 0) + (neighbor_idx >= scores.size) neighbor_idx = np.delete( neighbor_idx, np.argwhere(outOfBounds) ) scores = np.where( np.isin( np.arange(scores.size, dtype=int), neighbor_idx ), 0.5, scores ) # Set azimuth found slots to maximal score scores[lst_crossing_indices] = 1. else: # Number of consecutive false in the array scores[lst_crossing_indices] = 1 bool_array = ~scores.astype(bool) consecutive_falses = np.diff( np.where( np.concatenate( ([bool_array[0]], bool_array[:-1] != bool_array[1:], [True]) ) )[0] )[::2].max() kernel_size = consecutive_falses + 1 if consecutive_falses%2==0 else consecutive_falses kernel = np.arange(kernel_size) - int(np.floor(kernel_size/2)) kernel = 1. - np.abs(kernel)/kernel.max() scores = np.convolve(scores, kernel, mode='same') self.score = scores # self.score = mask.astype(float) return self.score
# ============================================================= # # ============================================================= # # ============================================================= # # ----------------------- LocalTimeCnst ----------------------- # # ============================================================= #
[docs] class LocalTimeCnst(ScheduleConstraint): """ .. versionadded:: 1.2.0 """
[docs] def __init__(self, hMin, hMax, weight=1): super().__init__(weight=weight) self.hMin = hMin self.hMax = hMax
# --------------------------------------------------------- # # --------------------- Getter/Setter --------------------- # @property def hMin(self): """ """ return self._hMin @hMin.setter def hMin(self, h): if not isinstance(h, Angle): raise TypeError( f'{h} should be of type {type(Angle)}.' ) self._hMin = h @property def hMax(self): """ """ return self._hMax @hMax.setter def hMax(self, h): if not isinstance(h, Angle): raise TypeError( f'{h} should be of type {type(Angle)}.' ) self._hMax = h # --------------------------------------------------------- # # ------------------------ Methods ------------------------ #
[docs] def get_score(self, indices): """ """ self._is_numpy_instance(indices) return np.mean(self.score[indices], axis=-1)
# --------------------------------------------------------- # # ----------------------- Internal ------------------------ # def _evaluate(self, time, nslots): """ """ self._is_time_instance(time) # Convert time to France local time (take into account # daylight savings) tz = pytz.timezone('Europe/Paris') timezoneTime = map(tz.localize, time.datetime) utcOffset = np.array( [tt.utcoffset().total_seconds() for tt in timezoneTime] ) localTime = time + TimeDelta(utcOffset, format='sec') # Convert the 'hour' part in decimal 'angle' values hours = np.array([tt.split()[1] for tt in localTime.iso]) localHours = Angle(hours, unit='hour').hour # Selection if self.hMin > self.hMax: # If 'midnight' is in the range mask = (localHours <= self.hMin.hour) &\ (localHours >= self.hMax.hour) mask = ~mask else: mask = (localHours >= self.hMin.hour) &\ (localHours <= self.hMax.hour) score = mask[:-1].astype(float) self.score = np.where(score==0, np.nan, score) return self.score
# ============================================================= # # ============================================================= # # ============================================================= # # ------------------------ UTCHourCnst ------------------------ # # ============================================================= #
[docs] class UTCHourCnst(ScheduleConstraint): """Contraint to force an observation block to be scheduled between two fixed UTC hours. """
[docs] def __init__(self, h_min: Angle, h_max: Angle, weight: float = 1): super().__init__(weight=weight) self.h_min = h_min self.h_max = h_max
# --------------------------------------------------------- # # --------------------- Getter/Setter --------------------- # @property def h_min(self): """ """ return self._h_min @h_min.setter def h_min(self, h): if not isinstance(h, Angle): raise TypeError( f'{h} should be of type {type(Angle)}.' ) self._h_min = h @property def h_max(self): """ """ return self._h_max @h_max.setter def h_max(self, h): if not isinstance(h, Angle): raise TypeError( f'{h} should be of type {type(Angle)}.' ) self._h_max = h # --------------------------------------------------------- # # ------------------------ Methods ------------------------ #
[docs] def get_score(self, indices): """ """ self._is_numpy_instance(indices) return np.mean(self.score[indices], axis=-1)
# --------------------------------------------------------- # # ----------------------- Internal ------------------------ # def _evaluate(self, time, nslots): """ """ self._is_time_instance(time) # Convert the 'hour' part in decimal 'angle' values hours = np.array([tt.split()[1] for tt in time.iso]) local_hours = Angle(hours, unit='hour').hour # Selection if self.h_min > self.h_max: # If 'midnight' is in the range mask = (local_hours <= self.h_min.hour) &\ (local_hours >= self.h_max.hour) mask = ~mask else: mask = (local_hours >= self.h_min.hour) &\ (local_hours <= self.h_max.hour) score = mask[:-1].astype(float) self.score = np.where(score==0, np.nan, score) return self.score
# ============================================================= # # ============================================================= # # ============================================================= # # ----------------------- TimeRangeCnst ----------------------- # # ============================================================= #
[docs] class TimeRangeCnst(ScheduleConstraint): """ .. versionadded:: 1.2.0 """
[docs] def __init__(self, time_min, time_max, weight=1): super().__init__(weight=weight) self.time_min = time_min self.time_max = time_max
# --------------------------------------------------------- # # --------------------- Getter/Setter --------------------- # @property def time_min(self): """ """ return self._time_min @time_min.setter def time_min(self, t): if not isinstance(t, Time): raise TypeError( f'{t} should be of type {type(Time)}.' ) self._time_min = t @property def time_max(self): """ """ return self._time_max @time_max.setter def time_max(self, t): if not isinstance(t, Time): raise TypeError( f'{t} should be of type {type(Time)}.' ) self._time_max = t # --------------------------------------------------------- # # ------------------------ Methods ------------------------ #
[docs] def get_score(self, indices): """ """ self._is_numpy_instance(indices) return np.mean(self.score[indices], axis=-1)
# --------------------------------------------------------- # # ----------------------- Internal ------------------------ # def _evaluate(self, time, nslots): """ time: dim + 1 """ self._is_time_instance(time) jds = time.jd if self.time_min.isscalar: mask = (jds >= self.time_min.jd) & (jds <= self.time_max.jd) else: mask = np.sum( (jds[:, None] >= self.time_min.jd) & (jds[:, None] <= self.time_max.jd), axis=1, dtype=bool ) score = mask[:-1].astype(float) self.score = np.where(score==0, np.nan, score) return self.score
# ============================================================= # # ============================================================= # # ============================================================= # # ----------------------- NightTimeCnst ----------------------- # # ============================================================= #
[docs] class NightTimeCnst(Constraint): """ """
[docs] def __init__(self, sun_elevation_max=0., scale_elevation=True, weight=1): super().__init__(weight=weight) self.sun_elevation_max = sun_elevation_max self.scale_elevation = scale_elevation
# --------------------------------------------------------- # # ------------------------ Methods ------------------------ # def get_score(self, indices): """ """ return np.mean( np.where( self.score[indices]>0., 1, 0 ) ) # --------------------------------------------------------- # # ----------------------- Internal ------------------------ # def _evaluate(self, sun_elevation): """ """ sun_elevation[sun_elevation > self.sun_elevation_max] = np.nan if all(np.isnan(sun_elevation)): log.warning( "Constraint <NightTimeCnst(sun_elevation_max=" f"{self.sun_elevation_max})> cannot be satisfied" "over the given time range." ) self.score = sun_elevation[1:]*np.nan elif not self.scale_elevation: self.score = sun_elevation/sun_elevation else: elevation_min = np.nanmin(sun_elevation) self.score = sun_elevation/elevation_min return self.score
# ============================================================= # # ============================================================= # # ============================================================= # # --------------------- SunProximityCnst ---------------------- # # ============================================================= # # class SunProximityCnst(Constraint): # def __init__(self, min_angular_sep=0., max_angular_sep=360, weight=1): # super().__init__(weight=weight) # self.min_angular_sep = min_angular_sep # self.max_angular_sep = max_angular_sep # # --------------------------------------------------------- # # # ------------------------ Methods ------------------------ # # def get_score(self, indices): # """ """ # return np.mean( # np.where( # self.score[indices]>0., # 1, # 0 # ) # ) # # --------------------------------------------------------- # # # ----------------------- Internal ------------------------ # # def _evaluate(self, target, sun): # """ # """ # separation = target._fk5.separation(sun._fk5).deg # proximity_mask = (separation >= self.min_angular_sep) * (separation <= self.max_angular_sep) # score = proximity_mask[:-1].astype(float) # self.score = np.where(score==0, np.nan, score) # return self.score # ============================================================= # # ============================================================= # # ============================================================= # # ----------------------- PeriodicCnst ------------------------ # # ============================================================= #
[docs] class PeriodicCnst(ScheduleConstraint): """Constraint to force a block to be scheduled within periodical time intervals in the schedule. Examples -------- .. code-block:: python >>> from nenupy.schedule.constraints import PeriodicCnst >>> from astropy.time import Time, TimeDelta >>> start = Time("2025-12-01 00:00:00") >>> p = TimeDelta(2, format="jd") >>> tol = TimeDelta(0.3, format="jd") >>> cnt = PeriodicCnst( start_time=start, periodicity=p, tolerance=tol ) >>> times = start + np.arange(300) * TimeDelta(1800, format="sec") >>> score = cnt(times) >>> fig = plt.figure(figsize=(10, 5)) >>> ax = fig.add_subplot() >>> for i in range(4): >>> middle = start + i * p >>> ax.axvline(middle.datetime, linestyle=":", color="black") >>> ax.axvspan((middle - tol).datetime, (middle + tol).datetime, color="tab:orange", alpha=0.5) >>> ax.plot(times[:-1].datetime, np.ones(times[:-1].size), marker=".", linestyle="") >>> mask = ~ np.isnan(score) >>> ax.plot(times[:-1].datetime[mask], np.ones(times[:-1].size)[mask], marker=".", linestyle="") >>> ax.set_xlabel("Time") .. figure:: ../_images/schedule_images/periodic_cnst.png :width: 650 :align: center """
[docs] def __init__(self, start_time: Time, periodicity: TimeDelta, tolerance: TimeDelta, weight: float = 1 ): """Return an instance of :class:`~nenupy.schedule.constraints.PeriodicCnst`. Parameters ---------- start_time : :class:`~astropy.time.Time` Center of the first time interval. periodicity : :class:`~astropy.time.TimeDelta` Time duration before the next repetition of the time interval. tolerance : :class:`~astropy.time.TimeDelta` half-width of each time interval. weight : `float`, optional Constraint weight used to ponderate constraints with respect to one another, by default 1 """ super().__init__(weight=weight) self.start_time = start_time self.periodicity = periodicity self.tolerance = tolerance
# --------------------------------------------------------- # # ------------------------ Methods ------------------------ #
[docs] def get_score(self, indices: np.ndarray) -> float: """Return the constraint score evaluated on a schedule ``indices``. The constraint must have been evaluated first. Parameters ---------- indices : :class:`~numpyp.ndarray` Time indices on which the constraint score should be computed. The indices must match the array stored in :attr:`~nenupy.scedule.constraints.Constraint.score`. Returns ------- `float` The constraint score evaluated as its average on ``indices``. """ self._is_numpy_instance(indices) return np.mean(self.score[indices], axis=-1)
# --------------------------------------------------------- # # ----------------------- Internal ------------------------ # def _evaluate(self, time: Tuple, nslots: int = None) -> np.ndarray: """ """ self._is_time_instance(time) integer_time_div = (time - self.start_time).jd % self.periodicity.jd time_mask = (integer_time_div <= self.tolerance.jd) + (integer_time_div >= self.periodicity.jd - self.tolerance.jd) score = time_mask[:-1].astype(float) self.score = np.where(score==0, np.nan, score) return self.score
# ============================================================= # # ============================================================= # # ============================================================= # # -------------------- SolarProximityCnst --------------------- # # ============================================================= #
[docs] class SolarProximityCnst(Constraint): """ """
[docs] def __init__(self, closer: bool = False, min_separation_deg: float = None, max_separation_deg: float = None, scale: bool = True, weight: float = 1 ): super().__init__(weight=weight) self.closer = closer self.min_separation_deg = min_separation_deg self.max_separation_deg = max_separation_deg self.scale = scale
# --------------------------------------------------------- # # ------------------------ Methods ------------------------ # def get_score(self, indices: np.ndarray) -> np.ndarray: """ """ return np.mean( np.where( self.score[indices] > 0., 1, 0 ) ) # --------------------------------------------------------- # # ----------------------- Internal ------------------------ # def _evaluate(self, target: _Target, sun_position: SSTarget): """ """ separation = target.target.separation(sun_position).deg # Set to NaN intervals where the separation is outside requested values if not (self.min_separation_deg is None): separation[separation <= self.min_separation_deg] = np.nan if not (self.max_separation_deg is None): separation[separation > self.max_separation_deg] = np.nan if all(np.isnan(separation)): log.warning( "Constraint <SolarProximityCnst(min_separation_deg=" f"{self.min_separation_deg}, max_separation_deg={self.max_separation_deg})> " "cannot be satisfied over the given time range." ) # self.score = separation[1:] * np.nan self.score = separation * np.nan else: # sep_min = np.nanmin(separation) sep_max = np.nanmax(separation) # farther_normalized_score = (separation - sep_min) / (sep_max - sep_min) # if self.closer: # self.score = 1 - farther_normalized_score # else: # self.score = farther_normalized_score if self.closer: # Add .1 to avoid 0 self.score = (sep_max + 0.1 - separation) / np.nanmax(sep_max + 0.1 - separation) else: self.score = separation / sep_max if not self.scale: self.score = np.where(self.score > 0, 1, self.score) return self.score else: return self.score
# ============================================================= # # ============================================================= # # ============================================================= # # ------------------------ Constraints ------------------------ # # ============================================================= #
[docs] class Constraints(object): """ .. versionadded:: 1.2.0 """
[docs] def __init__(self, *constraints): self._default_el = False self.constraints = constraints self.score = None # Check they are all of unique type unique, count = np.unique( [str(cons.__class__) for cons in self.constraints], return_counts=True ) if any(count > 1): message = ( 'There can only be one constraint type per ' 'observation:' ) for typ, n in zip(unique, count): if n > 1: message += f"\n\t* '{typ}': {n} instances." raise ValueError(message) # Add the elevation constraint by default #if not str(ElevationCnst) in unique: if not np.any(np.isin(np.array([str(ElevationCnst)]), unique)): self.constraints += (ElevationCnst(0.),) self._default_el = True
def __add__(self, other): """ """ if not isinstance(other, Constraint): raise TypeError('') if isinstance(other, ElevationCnst): if self._default_el: # Remove the default elevation constraint # if a new one is added cs = np.array([str(c.__class__) for c in self.constraints]) elCnst_idx = np.where(cs == str(ElevationCnst))[0][0] listCnst = list(self.constraints) listCnst.pop(elCnst_idx) self.constraints = tuple(listCnst) self._default_el = False constraints = self.constraints + (other,) cts = Constraints(*constraints) cts._default_el = self._default_el return cts def __getitem__(self, n): """ """ return self.constraints[n] def __len__(self): """ """ return len(self.constraints) def __del__(self): for constraint in self.constraints: del constraint # --------------------------------------------------------- # # --------------------- Getter/Setter --------------------- # @property def size(self): """ """ return len(self) @property def weights(self): """ """ return np.array([cnt.weight for cnt in self]) # --------------------------------------------------------- # # ------------------------ Methods ------------------------ #
[docs] def evaluate(self, target, time, nslots=1, sun_elevation=None, sun_position=None): """ """ cnts = np.zeros((self.size, time.size - 1)) unEvaluatedCnst = 0 for i, cnt in enumerate(self): if isinstance(cnt, TargetConstraint) and (target is not None): cnts[i, :] = cnt(target, nslots) elif isinstance(cnt, ScheduleConstraint): cnts[i, :] = cnt(time, nslots) elif isinstance(cnt, NightTimeCnst): cnts[i, :] = cnt(sun_elevation) elif isinstance(cnt, SolarProximityCnst): cnts[i, :] = cnt(target, sun_position) else: unEvaluatedCnst += 1 if unEvaluatedCnst == self.size: cnts += 1. log.debug( 'No defined constraint could be used. Schedule ' 'slot scores have been set to 1...' ) score = np.average(cnts, weights=self.weights, axis=0) self.score = np.where( np.isnan(score), 0, score ) self.score[np.where(np.prod(cnts, axis=0)==0)[0]] = 0 return self.score
def evaluate_various_nslots(self, target: _Target, time: Time, test_indices: List[np.ndarray], sun_elevation=None, sun_position=None) -> List[np.ndarray]: scores = [] # Loop over each set of indices for indices_window in test_indices: # Copy target and only keep the time/coordinates around the boundaries of test_indices to optimize the computations min_idx, max_idx = np.min(indices_window), np.max(indices_window) + 1 if target is None: sliced_target = None else: sliced_target = target[min_idx : max_idx] # Perform a copy without damaging the original object sliced_time = time[min_idx : max_idx] if sun_elevation is None: sun_elev = None else: sun_elev = sun_elevation[min_idx : max_idx] if sun_position is None: sun_pos = None else: sun_pos = sun_position[min_idx : max_idx] nslots = indices_window.size - 1 # Evaluate the constraints n_unevaluated_constraints = 0 constraint_scores = np.zeros((self.size, nslots)) for constraint_i, constraint in enumerate(self): constraint = copy(constraint) # to prevent overwriting initial constraints if isinstance(constraint, TargetConstraint) and (sliced_target is not None): constraint_scores[constraint_i, :] = constraint(sliced_target, nslots) elif isinstance(constraint, ScheduleConstraint): constraint_scores[constraint_i, :] = constraint(sliced_time, nslots) elif isinstance(constraint, NightTimeCnst): constraint_scores[constraint_i, :] = constraint(sun_elev[:-1]) # already taken at mid dt elif isinstance(constraint, SolarProximityCnst) and (sliced_target is not None): constraint_scores[constraint_i, :] = constraint(sliced_target, sun_pos[:-1]) else: n_unevaluated_constraints += 1 if n_unevaluated_constraints == self.size: constraint_scores += 1. log.debug( "No defined constraint could be used. Schedule " "slot scores have been set to 1..." ) # Compute the weighted average score window_score = np.average(constraint_scores, weights=self.weights, axis=0) # Replace NaN values by 0 window_score = np.where(np.isnan(window_score), 0, window_score) # If any constraint is at 0, set the time-score at 0 no matter the other constraints window_score[np.where(np.prod(constraint_scores, axis=0)==0)[0]] = 0 scores.append(window_score) # return the scores return scores
[docs] def plot(self, **kwargs): """ kwargs: figsize figname """ fig = plt.figure( figsize=kwargs.get('figsize', (10, 5)) ) ax = fig.add_subplot() for cnt in self: # Overplot each constraint cnt._plot_constraint(ax=ax, **kwargs) ax.plot( self.score, label="Total" ) ax.set_xlabel("Time index") ax.set_ylabel("Constraint score") ax.legend() # Save or show the figure figname = kwargs.get("figname", "") if figname != "": plt.savefig( figname, dpi=300, bbox_inches="tight", transparent=True ) log.info(f"Figure '{figname}' saved.") else: plt.show() plt.close("all")
# --------------------------------------------------------- # # ----------------------- Internal ------------------------ # # ============================================================= # # ============================================================= #