#! /usr/bin/python3
# -*- coding: utf-8 -*-
"""
.. _schedule_obsblocks:
**************
Booking Blocks
**************
"""
__author__ = 'Alan Loh'
__copyright__ = 'Copyright 2021, nenupy'
__credits__ = ['Alan Loh']
__maintainer__ = 'Alan'
__email__ = 'alan.loh@obspm.fr'
__status__ = 'Production'
__all__ = [
'Block',
'ObsBlock',
'ReservedBlock'
]
import numpy as np
import functools
from copy import deepcopy
from astropy.time import Time, TimeDelta
from nenupy.schedule.targets import _Target
from nenupy.schedule.constraints import Constraints
import logging
log = logging.getLogger(__name__)
# ============================================================= #
# ============================================================= #
KPS = {
'es00': {
'name': 'Comissioning',
'color': '#7DCEA0'
},
'es01': {
'name': 'Cosmic Dawn',
'color': '#825A2C'
},
'es02': {
'name': 'Exoplanets and stars',
'color': '#87794E'
},
'es03': {
'name': 'Pulsars',
'color': '#3F729C'
},
'es04': {
'name': 'Transients',
'color': '#8800CC'
},
'es05': {
'name': 'Fast Radio Bursts',
'color': '#00ABA9'
},
'es06': {
'name': 'Planetary Lightning',
'color': '#D80073'
},
'es07': {
'name': 'Jupiter',
'color': '#B8860B'
},
'es08': {
'name': 'Galaxy clusters and AGNs',
'color': '#0050EF'
},
'es09': {
'name': 'Cluster filament and cosmic magnetism',
'color': '#CC66FF'
},
'es10': {
'name': 'Recombination Lines',
'color': '#FA6800'
},
'es11': {
'name': 'Sun',
'color': '#008A00'
},
'es12': {
'name': 'Radio Gamma',
'color': '#A20025'
},
'es13': {
'name': 'SETI',
'color': '#F4A460'
},
'es14': {
'name': 'Cas A',
'color': '#517A7A'
},
'es15': {
'name': 'Large Survey',
'color': '#6699FF'
},
'es16': {
'name': 'LOFAR-NenuFAR',
'color': '#708D23'
},
'es17': {
'name': 'Radio-Amateurs',
'color': '#424242'
},
'lt00': {
'name': 'Comissioning',
'color': '#7DCEA0'
},
'lt01': {
'name': 'Cosmic Dawn',
'color': '#825A2C'
},
'lt02': {
'name': 'Exoplanets and stars',
'color': '#87794E'
},
'lt03': {
'name': 'Pulsars',
'color': '#3F729C'
},
'lt04': {
'name': 'Transients',
'color': '#8800CC'
},
'lt05': {
'name': 'Fast Radio Bursts',
'color': '#00ABA9'
},
'lt06': {
'name': 'Planetary Lightning',
'color': '#D80073'
},
'lt07': {
'name': 'Jupiter joint studies',
'color': '#B8860B'
},
'lt09': {
'name': 'Galaxies',
'color': '#CC66FF'
},
'lt10': {
'name': 'Recombination Lines',
'color': '#FA6800'
},
'lt11': {
'name': 'Sun',
'color': '#008A00'
},
'lt12': {
'name': 'Radio Gamma',
'color': '#A20025'
},
'lt13': {
'name': 'SETI',
'color': '#F4A460'
},
'rp1a': {
'name' : 'Faraday tomography of 3C196 field',
'color': '#000000' # TO BE DEFINED
},
'rp1b': {
'name': 'NenuFAR Low-Frequency Sky Survey',
'color': '#6699FF'
},
'rp1c': {
'name': 'Free-free absorption in Cassiopeia A',
'color': '#517A7A'
},
'sp16': {
'name': 'Student training',
'color': '#708D23'
},
'sp17': {
'name': 'Radio-Amateurs',
'color': '#424242'
}
}
STATUS = {
'good': '#8CC152',
'medium': '#ECB23C',
'bad': '#E9573F'
}
# ============================================================= #
# ============================================================= #
# ============================================================= #
# --------------------------- Block --------------------------- #
# ============================================================= #
[docs]
class Block(object):
"""
.. versionadded:: 1.2.0
"""
def __init__(self, *blocks):
self._blocks = blocks
self._assign_indices()
def __len__(self):
"""
"""
return len(self._blocks)
def __mul__(self, n):
""" Duplicates the :class:`~nenupy.schedule.obsblock.Block`
``n`` times.
"""
blocks = []
for block in self._blocks:
for i in range(n):
copied_block = deepcopy(block)
try:
# Do not get a copy of the target attribute
# if it exists. Therefore any costly computation
# on target coordinates will not happen n times.
copied_block.target = block.target
copied_block.constraints = block.constraints
except AttributeError:
# Attribute target is not found for ReservedBlock
pass
blocks.append(copied_block)
return Block(*blocks)
def __add__(self, other):
"""
"""
if not isinstance(other, Block):
raise TypeError(
'Addition may be made from two `Blocks` '
'instances. Instead one instance is of type '
f'`{type(other)}`.'
)
blocks = self._blocks + other._blocks
return Block(*blocks)
def __radd__(self, other):
if other == 0:
return self
else:
print(other)
return self.__add__(other)
def __getitem__(self, n):
"""
"""
return self._blocks[n]
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def size(self):
"""
"""
return len(self)
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def get(self, **kwargs):
"""
Example:
bb = aa.get(program='es00')
"""
if len(kwargs) != 1:
raise ValueError(
'Only one key=value is allowed.'
)
(attr, value), = kwargs.items()
blocks = (
blk for blk in self if getattr(blk, attr)==value
)
return Block(*blocks)
[docs]
def reset(self):
"""
"""
for block in self._blocks:
block.constraints = None
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _assign_indices(self):
"""
"""
for i, block in enumerate(self._blocks):
block.blockIdx = i
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ------------------------- ObsBlock -------------------------- #
# ============================================================= #
[docs]
class ObsBlock(Block):
""" Class to handle observation blocks.
:param name:
The name of the observation, for further reference.
:type name:
`str`
:param program:
The NenuFAR scientific program to which this observation belongs.
:type program:
`str`
:param target:
The celestial target.
:type target:
:class:`~nenupy.schedule.targets.ESTarget` or :class:`~nenupy.schedule.targets.SSTarget`
:param constraints:
The observing constraints to apply.
:type constraints:
:class:`~nenupy.schedule.constraints.Constraints`
:param duration:
The requested duration of the observation.
:type duration:
:class:`~astropy.time.TimeDelta`
:param processing_delay:
Time delay needed after the observation for the processing to take place.
Only :class:`~nenupy.schedule.obsblocks.ObsBlock`s with this parameter set are compared with each other while computing the scheduling.
This parameter particularly suits the imaging data.
:type processing_delay:
:class:`~astropy.time.TimeDelta`
:param max_extended_duration:
Once the observation block is booked. The duration of the scheduled observation will be extended up to this value.
The duration extension will cease if the resulting scheduled score ever decreases or if another scheduled observation is reached.
:type max_extended_duration:
:class:`~astropy.time.TimeDelta`
.. seealso::
:ref:`observation_request_sec`
.. rubric:: Attributes Summary
.. autosummary::
~ObsBlock.program
~ObsBlock.target
~ObsBlock.constraints
.. rubric:: Attributes and Methods Documentation
"""
def __init__(
self, name, program, target,
constraints=None,
duration=TimeDelta(3600, format='sec'),
processing_delay: TimeDelta = None,
max_extended_duration: TimeDelta = None
):
self.name = name
self.program = program
self.target = target
self.constraints = constraints
self.duration = duration
self.processing_delay = processing_delay
self.max_extended_duration = max_extended_duration
self.blockIdx = 0
super().__init__(self)
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def program(self):
"""
"""
return self._program
@program.setter
def program(self, pg):
pg = pg.lower()
self._isKP(pg)
self._program = pg
@property
def target(self):
"""
"""
return self._target
@target.setter
def target(self, src):
if src is None:
pass
elif not isinstance(src, _Target):
raise TypeError(
f'`target` should be of type {_Target}.'
)
elif src._lst is not None:
# Clear the target from previous computations
src.reset()
self._target = src
@property
def constraints(self):
"""
"""
return self._constraints
@constraints.setter
def constraints(self, ct):
if ct is None:
ct = Constraints()
self._constraints = ct
@property
def kpColor(self):
"""
"""
return KPS[self.program]['color']
@property
def title(self):
"""
"""
_char_limit = 23
block_id = f'ID: {self.blockIdx}'
kp_infos = ' - '.join([
self.program.upper(),
KPS[self.program]['name']
])
kp_infos = kp_infos[:_char_limit]
obs_name = self.name[:_char_limit]
return f'{block_id}\n{kp_infos}\n{obs_name}'
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
# @classmethod
# def fromJson(self, jsonFile):
# """
# """
# def evaluateScore(self, time):
# """
# """
# # Evaluate the target positions over time and compute the
# # score (product of each contraint score)
# # If it has already been evaluated do not do twice
# if self.target._lst is None:
# self.target.computePosition(time)
# # Compute the product of the score for each constraint
# # self.constraints.computeWeight(
# # target=self.target
# # )
# self.constraints.evaluate(
# target=self.target,
# time=time,
# nslots=self.nSlots
# )
# log.debug(
# f"<ObsBlock> named '{self.name}': Constraint "
# "score evaluated."
# )
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
@staticmethod
def _isKP(kp):
"""
"""
if kp not in KPS.keys():
raise KeyError(
'`program` is not a valid NenuFAR Key Science '
f'Program, i.e. one of {KPS.keys()}.'
)
return True
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ------------------------- ObsBlock -------------------------- #
# ============================================================= #
class ObsBlock2(Block):
"""
.. versionadded:: 1.2.0
"""
def __init__(
self, name, program, target,
constraints=None,
duration=TimeDelta(3600, format='sec')
):
self.name = name
self.program = program
self.target = target
self.constraints = constraints
self.duration = duration
self.blockIdx = 0
self.isBooked = False
# These atrributes are filled once the ObsBlock has been
# evaluated over a time range
self._idx = None
self.time_min = None
self.time_max = None
self.nSlots = 0
self.startIdx = None
super().__init__(self)
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def program(self):
"""
"""
return self._program
@program.setter
def program(self, pg):
pg = pg.lower()
self._isKP(pg)
self._program = pg
@property
def target(self):
"""
"""
return self._target
@target.setter
def target(self, src):
if not isinstance(src, _Target):
raise TypeError(
f'`target` should be of type {_Target}.'
)
self._target = src
@property
def constraints(self):
"""
"""
return self._constraints
@constraints.setter
def constraints(self, ct):
# if ct is None:
# ct = ElevationConstraint(0.)
# elif not isinstance(ct, Constraint):
# raise TypeError(
# "Argument `constraints` should be provided "
# f"with a '{Constraint.__class__}'' instance."
# )
# else:
# hasEl = [isinstance(ci, ElevationConstraint) for ci in ct]
# if not any(hasEl):
# ct += ElevationConstraint(0.)
if ct is None:
ct = Constraints()
self._constraints = ct
@property
def startIdx(self):
"""
"""
return self._startIdx
@startIdx.setter
def startIdx(self, st):
if st is not None:
idxRange = np.arange(self.nSlots)
if np.isscalar(st):
self._idx = idxRange + st
else:
# Useful for genetic algorithm
self._idx = st[:, None] + idxRange[None, :]
self._startIdx = st
@property
def kpColor(self):
"""
"""
return KPS[self.program]['color']
@property
def statusColor(self):
"""
"""
if 0 <= self.score < 0.5:
return STATUS['bad']
elif 0.5 <= self.score < 0.8:
return STATUS['medium']
elif 0.8 <= self.score <=1.:
return STATUS['good']
else:
log.warning('Strange...')
return STATUS['bad']
@property
def title(self):
"""
"""
_charLimit = 23
blkId = f'ID: {self.blockIdx}'
kpInfos = ' - '.join(
[
self.program.upper(),
KPS[self.program]['name']
]
)
kpInfos = kpInfos[:_charLimit]
obsName = self.name[:_charLimit]
return f'{blkId}\n{kpInfos}\n{obsName}'
@property
def score(self):
"""
"""
if self._idx is None:
return 0.
else:
scores = []
for constraint in self.constraints:
scores.append(constraint.getScore(self._idx))
return np.mean(scores, axis=0)
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
# @classmethod
# def fromJson(self, jsonFile):
# """
# """
def evaluateScore(self, time, **kwargs):
"""
"""
# Evaluate the target positions over time and compute the
# score (product of each contraint score)
# If it has already been evaluated do not do twice
if self.target._lst is None:
self.target.computePosition(time)
# Compute the product of the score for each constraint
# self.constraints.computeWeight(
# target=self.target
# )
self.constraints.evaluate(
target=self.target,
time=time,
method=kwargs.get('method', 'prod')
)
log.debug(
f"<ObsBlock> #{self.blockIdx} named '{self.name}': "
"Constraint score evaluated."
)
def plot(self, **kwargs):
"""
kwargs
figsize
nPoints
figName
"""
import matplotlib.pyplot as plt
# Check if the obsblock has been booked in the schedule
if self.time_min is None:
log.warning(
f"<ObsBlock> #{self.blockIdx} named '{self.name}' "
"is not booked."
)
return
# Compute the target position
nPoints = kwargs.get('nPoints', 50)
dt = (self.time_max - self.time_min)/nPoints
times = self.time_min + np.arange(nPoints + 1)*dt
# Create a copy of the target object to keep self.target intact
target = deepcopy(self.target)
target.computePosition(times)
# Plot the figure
fig, ax1 = plt.subplots(
figsize=kwargs.get('figsize', (10, 5))
)
color1 = 'tab:blue'
ax1.plot(
times.datetime,
target.elevation,
color=color1,
label='Elevation'
)
ax1.axvline(
self.time_min.datetime,
color='black',
linestyle='-.'
)
ax1.axvline(
self.time_max.datetime,
color='black',
linestyle='-.'
)
ax1.set_title(f'{self.title}')
ax1.set_xlabel('Time (UTC)')
ax1.set_ylabel('Elevation (deg)', color=color1)
ax1.tick_params(axis='y', labelcolor=color1)
ax2 = ax1.twinx()
color2 = 'tab:orange'
ax2.set_ylabel('Azimuth (deg)', color=color2)
ax2.plot(
times.datetime,
target.azimuth,
color=color2,
label='Azimuth'
)
ax2.tick_params(axis='y', labelcolor=color2)
fig.tight_layout()
# Save or show the figure
figName = kwargs.get('figName', '')
if figName != '':
plt.savefig(
figName,
dpi=300,
bbox_inches='tight',
transparent=True
)
log.info(f"Figure '{figName}' saved.")
else:
plt.show()
plt.close('all')
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
@staticmethod
def _isKP(kp):
"""
"""
if kp not in KPS.keys():
raise KeyError(
'`program` is not a valid NenuFAR Key Science '
f'Program, i.e. one of {KPS.keys()}.'
)
return True
def _display(self, ax):
"""
"""
import matplotlib.dates as mdates
if self.time_min is None:
return
# Show the block rectangle
ax.axvspan(
self.time_min.datetime,
self.time_max.datetime,
facecolor=self.kpColor,
edgecolor='black',
alpha=0.6
)
# Indicate the status
ax.axvspan(
self.time_min.datetime,
self.time_max.datetime,
ymin=0.9,
facecolor=self.statusColor,
edgecolor='black',
)
ax.axvspan(
self.time_min.datetime,
self.time_max.datetime,
ymax=0.1,
facecolor=self.statusColor,
edgecolor='black',
)
# Show the observation block title
xMin, xMax = ax.get_xlim()
textPos = (self.time_min + (self.time_max - self.time_min)/2)
textPosMDate = mdates.date2num(textPos.datetime)
if (xMin <= textPosMDate) & (textPosMDate < xMax):
ax.text(
x=textPos.datetime,
y=0.5,
s=self.title,
horizontalalignment='center',
verticalalignment='center',
color='black',
fontweight='bold',
rotation=90,
fontsize=8
)
ax.text(
x=textPos.datetime,
y=0.05,
s=f'{self.score:.2f}',
horizontalalignment='center',
verticalalignment='center',
color='black',
fontsize=8
)
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ----------------------- ReservedBlock ----------------------- #
# ============================================================= #
[docs]
class ReservedBlock(Block):
""" Class to handle reserved schedule time blocks.
:param time_min:
Starting time of the reserved time window.
:type time_min:
:class:`~astropy.time.Time`
:param time_max:
Ending time of the reserved time window.
:type time_max:
:class:`~astropy.time.Time`
.. rubric:: Methods Summary
.. autosummary::
~ReservedBlock.from_VCR
.. rubric:: Attributes and Methods Documentation
"""
def __init__(self, time_min, time_max):
# These atrributes are filled once the ReservedBlk
# has been inserted over a time range
self.time_min = time_min
self.time_max = time_max
self.blockIdx = 0
self.isBooked = False
super().__init__(self)
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
@classmethod
def from_VCR(cls, file_name):
""" Instantiates a :class:`~nenupy.schedule.obsblocks.ReservedBlock` object from the `Virtual Control Room <https://gui-nenufar.obs-nancay.fr/Planning/>`_ current booking list.
:param file_name:
CSV file describing the VCR current booking.
:type file_name:
`str`
:returns:
Reserved slots from the VCR active bookings.
:rtype:
:class:`~nenupy.schedule.obsblocks.ReservedBlock`
.. warning::
Only users with 'administrator' status may download the booking files.
"""
reserved = []
# with open(file_name, 'r') as rfile:
# for line in rfile.readlines():
# words = line.split(',')
# print(words[0])
# reserved.append(
# cls(
# time_min=Time(words[0]),
# time_max=Time(words[1])
# )
# )
bookings = np.loadtxt(
file_name,
skiprows=1,
delimiter=',',
dtype={
'names': ('start', 'stop', 'kp', 'comment'),
'formats': ('U19', 'U19', 'U4', 'U50')
}
)
starts = Time(bookings["start"])
stops = Time(bookings["stop"])
for start, stop in zip(starts, stops):
reserved.append(
cls(
time_min=start,
time_max=stop
)
)
return functools.reduce(
lambda x, y: x+y,
reserved
)
# @classmethod
# def fromBookingFile(cls, fileName):
# """ Parse VCR booking.
# """
# blocks = []
# return cls()
[docs]
def is_within(self, start: Time, stop: Time) -> bool:
""" """
return (self.time_min >= start)*(self.time_max < stop)\
+ (self.time_min < start)*(self.time_max > start)*(self.time_max < stop)\
+ (self.time_max > stop)*(self.time_min > start)*(self.time_min < stop)\
+ (self.time_min < start)*(self.time_max > stop)
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _display(self, ax):
"""
"""
# Show the block rectangle
ax.axvspan(
self.time_min.datetime,
self.time_max.datetime,
facecolor='0.8',
edgecolor='black',
hatch='//'
)
# ============================================================= #
# ============================================================= #