#! /usr/bin/python3
# -*- coding: utf-8 -*-
"""
********
Schedule
********
"""
__author__ = 'Alan Loh'
__copyright__ = 'Copyright 2021, nenupy'
__credits__ = ['Alan Loh']
__maintainer__ = 'Alan'
__email__ = 'alan.loh@obspm.fr'
__status__ = 'Production'
__all__ = [
'_TimeSlots',
'Schedule'
]
from astropy.time import Time, TimeDelta
from astropy.table import Table
import numpy as np
from copy import deepcopy, copy
from nenupy.schedule.obsblocks import (
Block,
ObsBlock,
ReservedBlock,
)
from nenupy.schedule.targets import SSTarget
from nenupy.schedule.geneticalgo import GeneticAlgorithm
import logging
log = logging.getLogger(__name__)
# ============================================================= #
# ============================================================= #
def _isTime(*args):
"""
"""
return all([isinstance(arg, Time) for arg in args])
def _isTDelta(*args):
"""
"""
return all([isinstance(arg, TimeDelta) for arg in args])
STATUS = {
'good': '#8CC152',
'medium': '#ECB23C',
'bad': '#E9573F'
}
randGen = np.random.default_rng()
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ----------------------- ScheduleBlock ----------------------- #
# ============================================================= #
class ScheduleBlock(ObsBlock):
def __init__(self,
name,
program,
target,
constraints,
duration,
dt,
processing_delay,
max_extended_duration
):
super().__init__(
name=name,
program=program,
target=target,
constraints=constraints,
duration=duration,
processing_delay=processing_delay,
max_extended_duration=max_extended_duration
)
self.dt = dt
self.startIdx = None
self.time_min = None
self.time_max = None
self.nSlots = int(np.ceil(self.duration/self.dt))
if self.processing_delay is None:
self.n_delay_slots = 0
else:
self.n_delay_slots = int(np.ceil(self.processing_delay/self.dt))
def __del__(self):
pass
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def isBooked(self):
"""
"""
return self.startIdx is not None
@property
def indices(self):
"""
"""
if self.startIdx is not None:
idxRange = np.arange(self.nSlots)
if np.isscalar(self.startIdx):
return idxRange + self.startIdx
else:
# Useful for genetic algorithm
return self.startIdx[:, None] + idxRange[None, :]
else:
return None
@property
def score(self):
"""
"""
if self.indices is None:
return 0.
else:
scores = []
weights = []
for constraint in self.constraints:
scores.append(constraint.get_score(self.indices))
weights.append(constraint.weight)
#return np.nanmean(scores, axis=0)
return np.average(scores, weights=weights, axis=0)
@property
def statusColor(self):
"""
"""
if 0 <= self.score < 0.5:
return STATUS['bad']
elif 0.5 <= self.score < 0.8:
return STATUS['medium']
elif 0.8 <= self.score <=1.:
return STATUS['good']
else:
log.warning('Strange...')
return STATUS['bad']
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
def evaluate_score(self, time, sun_elevation):
"""
"""
# Evaluate the target positions over time and compute the
# score (product of each contraint score)
# If it has already been evaluated do not do twice
if (self.target is not None) and (self.target._lst is None):
self.target.computePosition(time)
if self.constraints.score is None:
# Compute the weighted mean score for all constraints
self.constraints.evaluate(
target=self.target,
time=time,
nslots=self.nSlots,
sun_elevation=sun_elevation
)
log.debug(
f"<ObsBlock> #{self.blockIdx} named '{self.name}': "
"Constraint score evaluated."
)
def is_within(self, start: Time, stop: Time) -> bool:
""" """
return (self.time_min >= start)*(self.time_max < stop)\
+ (self.time_min < start)*(self.time_max > start)*(self.time_max < stop)\
+ (self.time_max > stop)*(self.time_min > start)*(self.time_min < stop)\
+ (self.time_min < start)*(self.time_max > stop)
def plot(self, **kwargs):
"""
kwargs
figsize
nPoints
figname
"""
import matplotlib.pyplot as plt
# Check if the obsblock has been booked in the schedule
if not self.isBooked:
log.warning(
f"<ObsBlock> #{self.blockIdx} named '{self.name}' "
"is not booked."
)
return
# Compute the target position
nPoints = kwargs.get('nPoints', 50)
dt = (self.time_max - self.time_min)/nPoints
times = self.time_min + np.arange(nPoints + 1)*dt
# Create a copy of the target object to keep self.target intact
target = deepcopy(self.target)
target.computePosition(times)
# Plot the figure
fig, ax1 = plt.subplots(
figsize=kwargs.get('figsize', (10, 5))
)
color1 = 'tab:blue'
ax1.plot(
times.datetime,
target.elevation,
color=color1,
label='Elevation'
)
ax1.axvline(
self.time_min.datetime,
color='black',
linestyle='-.'
)
ax1.axvline(
self.time_max.datetime,
color='black',
linestyle='-.'
)
ax1.set_title(f'{self.title}')
ax1.set_xlabel('Time (UTC)')
ax1.set_ylabel('Elevation (deg)', color=color1)
ax1.tick_params(axis='y', labelcolor=color1)
ax2 = ax1.twinx()
color2 = 'tab:orange'
ax2.set_ylabel('Azimuth (deg)', color=color2)
ax2.plot(
times.datetime,
target.azimuth,
color=color2,
label='Azimuth'
)
ax2.tick_params(axis='y', labelcolor=color2)
fig.tight_layout()
# Save or show the figure
figname = kwargs.get('figname', '')
if figname != '':
plt.savefig(
figname,
dpi=300,
bbox_inches='tight',
transparent=True
)
log.info(f"Figure '{figname}' saved.")
else:
plt.show()
plt.close('all')
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _display(self, ax):
"""
"""
import matplotlib.dates as mdates
if self.time_min is None:
return
# Show the block rectangle
ax.axvspan(
self.time_min.datetime,
self.time_max.datetime,
facecolor=self.kpColor,
edgecolor='black',
alpha=0.6
)
# Indicate the status
ax.axvspan(
self.time_min.datetime,
self.time_max.datetime,
ymin=0.9,
facecolor=self.statusColor,
edgecolor='black',
)
ax.axvspan(
self.time_min.datetime,
self.time_max.datetime,
ymax=0.1,
facecolor=self.statusColor,
edgecolor='black',
)
# Show the observation block title
xMin, xMax = ax.get_xlim()
textPos = (self.time_min + (self.time_max - self.time_min)/2)
textPosMDate = mdates.date2num(textPos.datetime)
if (xMin <= textPosMDate) & (textPosMDate < xMax):
ax.text(
x=textPos.datetime,
y=0.5,
s=self.title,
horizontalalignment='center',
verticalalignment='center',
color='black',
fontweight='bold',
rotation=90,
fontsize=8
)
ax.text(
x=textPos.datetime,
y=0.05,
s=f'{self.score:.2f}',
horizontalalignment='center',
verticalalignment='center',
color='black',
fontsize=8
)
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ---------------------- ScheduleBlocks ----------------------- #
# ============================================================= #
class ScheduleBlocks(object):
"""
.. versionadded:: 1.2.0
"""
def __init__(self, dt):
self.dt = dt
self.blocks = []
self._nSlots = []
self._n_delay_slots = []
self._indices = []
self._idxCounter = 0
def __len__(self):
"""
"""
return len(self.blocks)
def __getitem__(self, n):
"""
"""
return self.blocks[n]
def __del__(self):
for block in self.blocks:
del block
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def size(self):
"""
"""
return len(self.blocks)
@property
def nSlots(self):
"""
"""
return np.array(self._nSlots, dtype=int)
@property
def n_delay_slots(self):
"""
"""
return np.array(self._n_delay_slots, dtype=int)
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
def insert(self, block):
"""
"""
if not isinstance(block, (ObsBlock, Block)):
raise TypeError(
f'<block> should be an {ObsBlock} or {Block} '
f'instance, got object of type {type(block)} instead.'
)
for blk in block:
sb = ScheduleBlock(
name=blk.name,
program=blk.program,
target=blk.target,
constraints=blk.constraints,
duration=blk.duration,
dt=self.dt,
processing_delay=blk.processing_delay,
max_extended_duration=blk.max_extended_duration
)
sb.blockIdx = self._idxCounter
self.blocks.append(sb)
self._nSlots.append(sb.nSlots)
self._n_delay_slots.append(sb.n_delay_slots)
self._indices.append(0)
self._idxCounter += 1
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ------------------------ _TimeSlots ------------------------- #
# ============================================================= #
class _TimeSlots(object):
"""
.. versionadded:: 1.2.0
"""
def __init__(
self,
time_min,
time_max,
dt=TimeDelta(3600, format='sec')
):
if not (_isTime(time_min, time_max) and _isTDelta(dt)):
raise TypeError(
f'Wrong types in `_TimeSlots({type(time_min)}, '
f'{type(time_max)}, {type(dt)})`.'
)
self.dt = dt
# Compute the time slots
self.starts, self.stops = self._compute_slots_time_range(
time_min=time_min,
time_max=time_max,
dt=dt
)
self._startsJD = self.starts.jd
self._stopsJD = self.stops.jd
self.size = self.starts.size
# Initialize array of free time slots
self.freeSlots = np.ones(self.size, dtype=bool)
# Initialize array of free processing time slots
self.free_processing_slots = np.ones(self.size, dtype=bool)
# Initialize array of slot indices
self.idxSlots = np.arange(self.size, dtype=int)
self._freeIndices = self.idxSlots
log.debug(
f'{self.__class__} instance created with '
f'{self.size} time slots.'
)
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def time_min(self):
""" Start time of the shedule.
:getter: Start time.
:type: :class:`~astropy.time.Time`
"""
return self.starts[0]
@property
def time_max(self):
""" Stop time of the shedule.
:getter: Stop time.
:type: :class:`~astropy.time.Time`
"""
return self.stops[-1]
@property
def dt(self):
""" Schedule granularity (time slot duration).
:setter: Schedule granularity.
:getter: Schedule granularity.
:type: :class:`~astropy.time.TimeDelta`
"""
return self._dt
@dt.setter
def dt(self, d):
self._dt = d
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
def time2idx(self, *time):
""" Converts times to schedule indices.
This method expects either one or two :class:`~astropy.time.Time` object(s).
"""
if not _isTime(*time):
raise TypeError(
f'<time2idx> expects a {Time} object.'
)
if len(time) == 1:
# Find the unique corresponding slot index
mask = (self._startsJD <= time[0].jd) &\
(self._stopsJD > time[0].jd)
elif len(time) == 2:
if time[1] < time[0]:
raise ValueError(
f'start>stop ({time[1].isot} < {time[0].isot})'
' in <time2idx(start, stop)>.'
)
# Find all indices within the time range
mask = (self._stopsJD > time[0].jd) &\
(self._startsJD < time[1].jd)
else:
raise Exception(
'<time2idx()> not properly called.'
)
return self.idxSlots[mask]
def add_booking(self, time_min: Time, time_max: Time) -> None:
""" Changes the status of the time slots comprised
between ``time_min`` and ``time_max`` to 'booked' (i.e., not
available anymore).
In particular, the attribute
:attr:`~nenupy.schedule._TimeSlots.freeSlots` is set
to ``False`` at the corresponding indices.
:param time_min:
Start time.
:type time_min:
:class:`~astropy.time.Time`
:param time_max:
Stop time.
:type time_max:
:class:`~astropy.time.Time`
"""
indices = self.time2idx(time_min, time_max)
if any(~self.freeSlots[indices]):
log.warning(
f"Booking on reserved slots from '{time_min.isot}' "
f"to '{time_max.isot}'."
)
self.freeSlots[indices] = False
self._freeIndices = self.idxSlots[self.freeSlots]
def remove_booking(self, time_min: Time, time_max: Time) -> None:
"""
"""
indices = self.time2idx(time_min, time_max)
self.freeSlots[indices] = True
self._freeIndices = self.idxSlots[self.freeSlots]
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
@staticmethod
def _compute_slots_time_range(time_min: Time, time_max: Time, dt: TimeDelta):
"""
"""
if time_max <= time_min:
raise ValueError(
f'Schedule time_max={time_max.isot} <= '
f'time_min={time_min.isot}.'
)
period = time_max - time_min
time_steps = int( np.ceil(period/dt) )
dt_shifts = np.arange(time_steps)*dt
slot_starts = time_min + dt_shifts
slot_stops = slot_starts + dt
return slot_starts, slot_stops
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ------------------------- Schedule -------------------------- #
# ============================================================= #
[docs]
class Schedule(_TimeSlots):
""" Foundation class of observation scheduling.
:param time_min:
Schedule time range lower edge.
:type time_min:
:class:`~astropy.time.Time`
:param time_max:
Schedule time range upper edge.
:type time_max:
:class:`~astropy.time.Time`
:param dt:
Schedule granularity (or time slot duration).
Default is ``1 hour``.
:type dt:
:class:`~astropy.time.TimeDelta`
:Example:
>>> from nenupy.schedule import Schedule
>>> from astropy.time import Time, TimeDelta
>>> schedule = Schedule(
>>> time_min=Time('2021-01-11 00:00:00'),
>>> time_max=Time('2021-01-15 00:00:00'),
>>> dt=TimeDelta(3600, format='sec')
>>> )
.. seealso::
:ref:`scheduling_doc`
.. rubric:: Attributes Summary
.. autosummary::
~nenupy.schedule.schedule._TimeSlots.time_min
~nenupy.schedule.schedule._TimeSlots.time_max
~nenupy.schedule.schedule._TimeSlots.dt
~Schedule.observation_blocks
~Schedule.reserved_blocks
.. rubric:: Methods Summary
.. autosummary::
~Schedule.set_free_slots
~Schedule.match_booking
~Schedule.insert
~Schedule.plot
~Schedule.plot_range
~Schedule.book
~Schedule.export
.. rubric:: Attributes and Methods Documentation
"""
def __init__(
self,
time_min,
time_max,
dt=TimeDelta(3600, format='sec')
):
super().__init__(
time_min=time_min,
time_max=time_max,
dt=dt
)
# self.observation_blocks = None
self.observation_blocks = ScheduleBlocks(dt=self.dt)
self.reserved_blocks = None
# Store the Sun's elevation
sun = SSTarget.fromName('Sun')
sun.computePosition(
Time(
np.append(
self._startsJD,
self._startsJD[-1] + self.dt.jd
),
format='jd'
)
)
elevation = sun.elevation.deg
self.sun_elevation = (elevation[1:] + elevation[:-1])/2
self._obs_extension_done = False
def __getitem__(self, n):
"""
"""
return self.observation_blocks[n]
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def observation_blocks(self):
""" Observation blocks included in the schedule.
:setter: Observation blocks.
:getter: Observation blocks.
:type: :class:`~nenupy.schedule.schedule.ScheduleBlocks`
"""
return self._observation_blocks
@observation_blocks.setter
def observation_blocks(self, obs):
self._observation_blocks = obs
@property
def reserved_blocks(self):
""" Reserved blocks included in the schedule.
No observation can be planned on these time slots.
:setter: Reserved blocks.
:getter: Reserved blocks.
:type: :class:`~nenupy.schedule.obsblocks.ReservedBlock`
"""
return self._reserved_blocks
@reserved_blocks.setter
def reserved_blocks(self, res):
self._reserved_blocks = res
@property
def score(self) -> float:
""" Returns the current schedule score. """
return np.mean([block.score for block in self.observation_blocks])
# @property
# def scheduledBlocks(self):
# """
# """
# list(map(schedule.observation_blocks.__getitem__, [i for i, blk in enumerate(schedule.observation_blocks) if blk.isBooked]))
# return self._scheduledBlocks
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def insert(self, *blocks):
r""" Inserts observation blocks or reserved blocks in the schedule.
:param \*blocks:
Observation blocks or reserved blocks.
:type \*blocks:
:class:`~nenupy.schedule.obsblocks.Block`, :class:`~nenupy.schedule.obsblocks.ObsBlock` or :class:`~nenupy.schedule.obsblocks.ReservedBlock`
:Example:
.. code-block:: python
from nenupy.schedule import Schedule, ESTarget, ObsBlock
from astropy.time import Time, TimeDelta
schedule = Schedule(
time_min=Time('2021-01-11 00:00:00'),
time_max=Time('2021-01-15 00:00:00'),
dt=TimeDelta(3600, format='sec')
)
cas_a = ObsBlock(
name="Cas A",
program="ES00",
target=ESTarget.fromName("Cas A"),
duration=TimeDelta(2*3600, format='sec'),
)
schedule.insert(cas_a)
"""
for blocks_i in blocks:
# if not isinstance(blocks_i, Block):
# raise TypeError(
# f'Expected input of type {Block}>'
# f', got <{blocks_i.__class__}> instead.'
# )
if all([blk.__class__ is ObsBlock for blk in blocks_i]):
self.observation_blocks.insert(blocks_i)
elif all([blk.__class__ is ReservedBlock for blk in blocks_i]):
for blk in blocks_i:
self.add_booking(blk.time_min, blk.time_max)
log.debug(f'ReservedBlock added from {blk.time_min.isot} to {blk.time_max.isot}.')
if self.reserved_blocks is None:
self.reserved_blocks = blocks_i
else:
self.reserved_blocks += blocks_i
else:
raise Exception(
f'Not supposed to happen for type {blocks_i.__class__}!'
)
self._toSchedule = np.ones(self.observation_blocks.size, dtype=bool)
[docs]
def set_free_slots(self, start_times: Time, stop_times: Time) -> None:
""" Assign a :class:`~nenupy.schedule.obsblocks.ReservedBlock`
to every Schedule time interval that is not comprised bewteen
``start_times`` and ``stop_times``.
"""
if start_times.size != stop_times.size:
raise ValueError(
"start_times and stop_times should have the same length."
)
elif start_times.isscalar and stop_times.isscalar:
start_times = start_times.reshape((1,))
stop_times = stop_times.reshape((1,))
elif (start_times.ndim != 1) or (stop_times.ndim != 1):
raise ValueError(
f"start_times ({start_times.ndim}D) and "
f"stop_times ({stop_times.ndim}D) should be 1D."
)
# Treat the first booking
if start_times[0] > self.time_min:
self.insert(
ReservedBlock(
time_min=self.time_min,
time_max=start_times[0]
)
)
# Treat the rest
for blk_start, blk_stop in zip(stop_times[:-1], start_times[1:]):
self.insert(
ReservedBlock(
time_min=blk_start,
time_max=blk_stop
)
)
# Treat the last booking
if stop_times[-1] < self.time_max:
self.insert(
ReservedBlock(
time_min=stop_times[-1],
time_max=self.time_max
)
)
[docs]
def match_booking(self, booking_file: str, key_program: str) -> None:
""" """
bookings = np.loadtxt(
booking_file,
skiprows=1,
delimiter=',',
dtype={
'names': ('start', 'stop', 'kp', 'comment'),
'formats': ('U19', 'U19', 'U4', 'U50')
}
)
# Check that the selected Key program is valid
kp_names = np.unique(bookings['kp'])
if key_program not in kp_names:
raise ValueError(
f'Key program {key_program} does not have any entry in {booking_file}, select a value in {kp_names}.'
)
valid_booking = bookings[bookings['kp'] == key_program]
booking_starts = Time(valid_booking['start'])
booking_stops = Time(valid_booking['stop'])
# Find each time period not in the valid booking range
within_schedule_mask = (booking_starts >= self.time_min)*(booking_stops <= self.time_max)\
+ (booking_starts < self.time_min)*(booking_stops > self.time_min)\
+ (booking_starts < self.time_max)*(booking_stops > self.time_max)
log.debug(f'{np.sum(within_schedule_mask)} bookings are within the schedule.')
# Insert corresponding reserved blocks in between the booking periods
starts = booking_starts[within_schedule_mask]
stops = booking_stops[within_schedule_mask]
# # Treat the first booking
# if starts[0] > self.time_min:
# self.insert(
# ReservedBlock(
# time_min=self.time_min,
# time_max=starts[0]
# )
# )
# for reserve_block_start, reserve_block_stop in zip(stops[:-1], starts[1:]):
# self.insert(
# ReservedBlock(
# time_min=reserve_block_start,
# time_max=reserve_block_stop
# )
# )
# # Treat the first booking
# if stops[-1] < self.time_max:
# self.insert(
# ReservedBlock(
# time_min=stops[-1],
# time_max=self.time_max
# )
# )
self.set_free_slots(start_times=starts, stop_times=stops)
[docs]
def book(self, optimize=False, **kwargs):
r""" Distributes the :attr:`~nenupy.schedule.schedule.Schedule.observation_blocks` over the schedule time slots.
The observing constraints are evaluated over the whole schedule.
:param optimize:
If set to ``False`` an heuristic algorithm is used to perfom the booking.
However faster, this could result in sub-optimized time slots filling.
If set to ``True``, a genetic algorithm is used (see below the configuration keywords).
This method delivers more optimized scheduling solutions.
However, due to the random nature of genetic algorithm behaviors, the results may not be reproducible and may vary from one run to the other.
Default is ``False``.
:type optimize:
`bool`
.. rubric:: Genetic algorithm configuration keywords
:param population_size:
Size of the solution population, i.e., how many distinct soluttions are evolving at the same time.
Default is 20.
:type population_size:
`int`
:param random_individuals:
At each generation, the population of solutions will replace its lower-score ``random_individuals`` solutions by random genome ones.
This allows for genetic diversity.
Default is ``1``.
:type random_individuals:
`int`
:param score_threshold:
Score value (between ``0`` and ``1``) at which the evolution may stop.
Default is ``0.8``.
:type score_threshold:
`float`
:param generation_max:
Maximum generation number at which the evolution has to stop (even if ``score_threshold`` is not reached).
Default is ``1000``.
:type generation_max:
`int`
:param max_stagnating_generations:
If the score does not evolve after ``max_stagnating_generations`` generations, the evolution is stopped.
Default is ``100``.
:type max_stagnating_generations:
`int`
:param selection:
At each generation, pairs of individuals (solutions) are selected to give birth to new children with a mix of their genomes (see ``crossover``).
This selection can be perfomed according to three methods.
``'FPS'`` (Fitness Proportionate Selection): parents are as likely to be picked as their score is high.
``'TNS'`` (Tournament Selection): :math:`k` individuals from the population are selected, the best becomes a parent (where :math:`k = \max( 2, \rm{population\ size}/10 )`).
``'RKS'`` (Rank Selection): individuals are ranked according to their scores, and are more likely to be picked according to their rank. The amplitude of score differences doesn't count.
Default is ``'FPS'``.
:type selection:
`str`
:param crossover:
Whenever parents give birth to children, their genome (i.e., time slot indices for each observation block) is mixed.
This cross-over can be performed according to three methods.
``'SPCO'`` (Single-point crossover): genes from 'parent1' and 'parent2' are interverted after a random index.
``'TPCO'`` (Two-point crossover): genes from 'parent1' and 'parent2' are interverted between two random indices.
``'UNCO'`` (Uniform crossover): every genes from 'parent1' and 'parent2' are interverted with a uniform probability.
Default is ``'SPCO'``.
:type crossover:
`str`
selectionMethod (default FPS)
crossoverMethod (default SPCO)
elitism
:returns:
If ``optimize`` is set to ``False``, nothing is returned.
If ``optimize`` is set to ``True``, the :class:`~nenupy.schedule.geneticalgo.GeneticAlgorithm` instance is returned.
The method :class:`~nenupy.schedule.geneticalgo.GeneticAlgorithm.plot` can then be called to visualize the evolution of the solution populations.
:rtype:
``None`` or :class:`~nenupy.schedule.geneticalgo.GeneticAlgorithm`
"""
# Ré-initialize the booking, in case the user calls this method several times
self._reset_observation_block_bookings()
# Pre-compute the constraints scores
self._compute_block_constraint_scores(**kwargs)
n_unscheduled_blocks = 0
# Raise warning if the observation block are impossible to fit
if sum(self._toSchedule) == 0:
log.warning(
'Required observation blocks have constraints '
'unfitted for this schedule.'
)
return
else:
log.info(
f'Fitting {sum(self._toSchedule)} observation blocks...'
)
if optimize:
# All the observation blocks are booked at
# once with the genetic algorithm.
ga = GeneticAlgorithm(
populate=self._populate,
fitness=self._fitness,
mutation=self._mutation,
populationSize=kwargs.get('population_size', 20)
)
# Find out the best observation schedule
ga.evolve(
**kwargs
)
# Book the observation_blocks on the schedule
k = 0
for i, blk in enumerate(self.observation_blocks):
if not self._toSchedule[i]:
# The observation had a null score over the schedule
continue
best_start_index = ga.bestGenome[k]
best_stop_index = best_start_index + blk.nSlots - 1
k += 1
if all(self._cnstScores[i, best_start_index:best_stop_index + 1] == 0):
n_unscheduled_blocks += 1
log.warning(
f"<ObsBlock> #{blk.blockIdx} '{blk.name}' cannot be scheduled."
)
continue
blk.startIdx = best_start_index
blk.time_min = self.starts[best_start_index]
blk.time_max = self.stops[best_start_index + blk.nSlots-1]
score = self.fine_tune(max_it=kwargs.get("fine_tune_max_iterations", 1000))
# Remove blocks that are overlapping
# Lowest scores are removed in priority
scores = [blk.score for blk in self.observation_blocks]
for _, blk in sorted(zip(scores, self.observation_blocks), key=lambda pair: pair[0]):
if blk.startIdx is None: continue
start_idx = blk.startIdx
stop_idx = blk.startIdx + blk.nSlots
if not all(self.freeSlots[start_idx:stop_idx]):
n_unscheduled_blocks += 1
blk.startIdx = None # unbook the block
log.warning(f"<ObsBlock> #{blk.blockIdx} is overlapping other blocks.")
continue
self.freeSlots[start_idx:stop_idx] = False
log.info(
f'{sum(self._toSchedule) - n_unscheduled_blocks}/'
f'{sum(self._toSchedule)} observation blocks scheduled '
f'({sum(~self._toSchedule)} impossible to fit).'
)
return ga
else:
block_indices = np.arange(self.observation_blocks.size)
if kwargs.get('sort_by_difficulty', False):
sort_idx = np.argsort(np.sum(self._cnstScores, axis=1))
block_indices = block_indices[sort_idx]
elif kwargs.get('sort_by_availability', False):
sort_idx = np.argsort(np.sum(self._cnstScores > 0, axis=1))
block_indices = block_indices[sort_idx]
# Block are booked iteratively
# for i, blk in enumerate(self.observation_blocks):
for i in block_indices:
blk = self.observation_blocks[i]
if (not self._toSchedule[i]) or (blk.isBooked):
continue
# Construct a mask to avoid setting the obsblock where it cannot fit
if blk.n_delay_slots != 0:
freeSlotsShifted = self.freeSlots.copy() * self.free_processing_slots
else:
freeSlotsShifted = self.freeSlots.copy()
# for j in range(blk.nSlots):
# freeSlotsShifted *= np.roll(self.freeSlots, -j)
# freeSlotsShifted[-blk.nSlots:] = self.freeSlots[-blk.nSlots:]
mask_idx = (np.where(~freeSlotsShifted)[0][:, None] - np.arange(blk.nSlots)[None, ::-1]).ravel()
mask_idx = np.unique(mask_idx)
mask_idx = mask_idx[mask_idx >= 0]
freeSlotsShifted[mask_idx] = False
# Find the best spot
score = self._cnstScores[i, :] * self.freeSlots * freeSlotsShifted
if all(score==0):
n_unscheduled_blocks += 1
log.warning(
f"<ObsBlock> #{i} '{blk.name}' cannot be scheduled."
)
continue
bestStartIdx = np.argmax(
score
)
# Assign a start and stop time to the block and update the free slots
blk.startIdx = bestStartIdx
bestStopIdx = bestStartIdx + blk.nSlots - 1
self.freeSlots[bestStartIdx:bestStopIdx + 1] = False
if blk.n_delay_slots != 0:
# Update the processing reserved slots
start_proc = bestStartIdx - blk.n_delay_slots
start_proc = 0 if start_proc < 0 else start_proc
stop_proc = bestStopIdx + blk.n_delay_slots + 1
stop_proc = self.size if stop_proc > self.size else stop_proc
self.free_processing_slots[start_proc:stop_proc] = False
blk.time_min = self.starts[bestStartIdx]
blk.time_max = self.stops[bestStopIdx]
log.info(
f'{sum(self._toSchedule) - n_unscheduled_blocks}/'
f'{sum(self._toSchedule)} observation blocks scheduled '
f'({sum(~self._toSchedule)} impossible to fit).'
)
[docs]
def extend_scheduled_observations(self, relative_score_threshold: float = 0.8) -> None:
""" """
if self._obs_extension_done:
log.warning("Scheduled observations have already been extended.")
return
log.info("Extending scheduled observations...")
times = Time(
np.append(
self._startsJD,
self._startsJD[-1] + self.dt.jd
),
format='jd'
)
# Prepare the available extensions for each observation block
extend_mask = []
# slot_extension = []
slot_extension_indices = []
# indices = []
for block in self.observation_blocks:
if not block.isBooked:
continue
extdt = block.max_extended_duration
extend_mask.append(extdt is not None)
try:
n_slots = int(np.ceil(extdt.sec/self.dt.sec))
except AttributeError: # extdt is None, and doesnt have .sec
slot_extension_indices.append([])
continue
# slot_extension.append(n_slots if extdt is not None else 0)
# indices.append(block.blockIdx)
# Compute the constraint score for each possibility (i.e. from self.nSlots to n_slots)
extended_windows_size = np.arange(block.nSlots + 1, n_slots + 1) + 1 # Add 1 to evaluate the constraints
extended_windows_slots = [np.arange(window_size) - int(window_size/2) + block.startIdx + int(block.nSlots/2) for window_size in extended_windows_size]
# Deal with edges (don't check indices outside of the Schedule range)
#extended_windows_slots = [window_indices[(window_indices >= 0) * (window_indices < self.size - 1)] for window_indices in extended_windows_slots] # self.size - 1 to compensate for the constraints evaluation
extended_windows_slots = [window_indices[(window_indices >= 0) * (window_indices < self.size)] for window_indices in extended_windows_slots]
# Compute the constraint score
constraint_score = block.constraints.evaluate_various_nslots(
target=block.target,
time=times,
test_indices=extended_windows_slots,
sun_elevation=self.sun_elevation
)
# Compute the mean score
def treat(data):
if np.prod(data) == 0:
return 0.
else:
return np.mean(data)
constraint_score = np.array(list(map(treat, constraint_score)))
# constraint_score = np.array(list(map(np.mean, constraint_score)))
# Filter out the constraint score to discard those that diminish the block score
# Add a relative threshold in order to filter...
positive_gradient_mask = constraint_score >= constraint_score[0] * relative_score_threshold
slot_extension_indices.append([slot_indices for slot_indices, better_option in zip(extended_windows_slots, positive_gradient_mask) if better_option])
# Iteratively try to increase the observation blocks
iteration_i = 0
max_iteration = max([len(indices) for indices in slot_extension_indices])
while iteration_i < max_iteration:
booked_block_i = 0
for block in self.observation_blocks:
# If the block has not been booked, don't consider it
if (not block.isBooked):
continue
# If the block can be extended, try to do it
if extend_mask[booked_block_i]:
available_indices = slot_extension_indices[booked_block_i]
# Iteratively try to grow
try:
new_block_indices = available_indices[iteration_i]
# If the number of free slots potentially occupied by the new indices is bigger, then extend the block
# + block.nSlots because the block is already scheduled and counts as occupying freeSlots
if sum(self.freeSlots[new_block_indices]) + block.nSlots > block.nSlots:
new_block_indices = np.array([idx for idx in new_block_indices if (self.freeSlots[idx] or (idx in block.indices))])
# Check that the new_slots are continuous (they could be overlapping a small nearby observation)
if np.all(np.diff(new_block_indices) == 1):
block.startIdx = new_block_indices[0]
new_block_nslots = len(new_block_indices)
block.duration = new_block_nslots * self.dt
block.nSlots = new_block_nslots
block.time_min = self.starts[block.startIdx]
block.time_max = self.stops[block.startIdx + block.nSlots - 1]
self.freeSlots[block.startIdx:block.startIdx + block.nSlots] = False
else:
pass
except IndexError:
# Max growth reached
pass
booked_block_i += 1
iteration_i += 1
self._obs_extension_done = True
[docs]
def fine_tune(self, max_it: int = 1000) -> None:
"""
"""
log.info("(Fine tunning) Launching...")
scores = []
# Loop until the score drops
while len(scores) < max_it:
# Gather starting index and size of each schedule obsblock
start_indices = []
nslots = []
delay_nslots = []
indices = []
for block in self.observation_blocks:
if not block.isBooked:
continue
start_indices.append(block.startIdx)
nslots.append(block.nSlots)
indices.append(block.blockIdx)
delay_nslots.append(block.n_delay_slots)
# Sort the by increasing index
start_indices_sorted = np.argsort(start_indices)
start_indices = np.array(start_indices, dtype=int)[start_indices_sorted]
nslots = np.array(nslots, dtype=int)[start_indices_sorted]
indices = np.array(indices, dtype=int)[start_indices_sorted]
delay_nslots = np.array(delay_nslots, dtype=int)[start_indices_sorted]
# Associate each observation blocks with its score before and after
max_scores = np.zeros((len(indices), 2)) # (number of booked blocks, score just before/score just after)
for i in range(start_indices_sorted.size):
block_index = indices[i]
block_score = self._cnstScores[block_index, start_indices[i]]
id1 = i - 1 # index of previous before
gap1 = np.arange(start_indices[id1] + nslots[id1] if id1 > 0 else 0, start_indices[i])
score1 = -1 if gap1.size == 0 else self._cnstScores[block_index, gap1[-1]] - block_score
id2 = i + 1 # index of next block
gap2 = np.arange(start_indices[i] + 1, start_indices[id2] - nslots[i] + 1 if id2 <= (len(indices) - 1) else self.size - 1)
score2 = -1 if gap2.size == 0 else self._cnstScores[block_index, gap2[0]] - block_score
if delay_nslots[i] != 0:
# If this is requires a processing delay, we must check for the previous and next similar
# observation, which may not be the very previous or very next.
# Override immediate left/right slot if a similar observation is too close.
delay_obs = np.argwhere(delay_nslots != 0)
# Left
prev_delay_idx = delay_obs[np.roll(delay_obs == i, -1)][0]
if prev_delay_idx > i:
# This is the first observation block with a delay constraint
pass
elif start_indices[prev_delay_idx] + nslots[prev_delay_idx] + delay_nslots[prev_delay_idx] >= start_indices[i]:
score1 = -1
# Right
next_delay_idx = delay_obs[np.roll(delay_obs == i, +1)][0]
if next_delay_idx < i:
# This is the last observation block with delay constraint
pass
elif start_indices[i] + nslots[i] + delay_nslots[i] >= start_indices[next_delay_idx]:
score2 = -1
# Score 'gradients'
max_scores[i, :] = np.array([
score1,
score2
])
# Find out the maximal score difference
left_right_max = np.argmax(max_scores, axis=1)
max_score_diff = np.max(max_scores[np.arange(left_right_max.size), left_right_max])
max_score_block_idx = np.argmax(max_scores[np.arange(left_right_max.size), left_right_max])
if max_score_diff <= 0:
log.info("(Fine tunning) No positive gradient available. End of fine-tunning loop.")
break
# Shift the block 1dt left or right
block = self.observation_blocks[indices[max_score_block_idx]]
block.startIdx += -1 if left_right_max[max_score_block_idx] == 0 else 1
block.time_min = self.starts[block.startIdx]
block.time_max = self.stops[block.startIdx + block.nSlots - 1]
direction = 'left' if left_right_max[max_score_block_idx] == 0 else 'right'
log.debug(f'Shifting block #{indices[max_score_block_idx]} to the {direction}.')
# Compute the score
scores.append(self.score)
else:
log.info("(Fine tunning) Maximum number of iterations reached.")
log.info(f"(Fine tunning) End after {len(scores)} iterations.")
return scores
[docs]
def export(self, score_min=0):
""" Exports the current schedule once :meth:`~nenupy.schedule.schedule.Schedule.book` has been called.
Every observation with a ``score`` higher than ``score_min`` is included in the export.
:param score_min:
Minimal :attr:`~nenupy.schedule.schedule.Schedule.obervation_blocks`'s score to be exported.
Default is ``0``.
:type scoremin:
`float`
:returns:
Scheduled observation blocks as a :class:`~astropy.table.Table`.
This object can further be converted to any user requirements using
the :meth:`astropy.io.ascii.write` method
(see the list of supported `formats <https://docs.astropy.org/en/stable/io/ascii/index.html#supported-formats>`_).
:rtype:
:class:`~astropy.table.Table`
"""
if not isinstance(score_min, (float, int)):
raise TypeError(
'<score_min> should be a number.'
)
elif score_min < 0:
raise ValueError(
'<score_min> should be a positive.'
)
elif score_min > 1:
raise ValueError(
'<score_min> cannot be greater than 1.'
)
names = []
index = []
starts = []
stops = []
programs = []
scores = []
for blk in self.observation_blocks:
if not blk.isBooked:
continue
score = blk.score
if score < score_min:
continue
scores.append(score)
names.append(blk.name)
index.append(blk.blockIdx)
starts.append(blk.time_min.isot)
stops.append(blk.time_max.isot)
programs.append(blk.program)
start_array = Time(starts, format='isot')
chron = np.argsort(start_array)
tab = Table()
tab['obsid'] = np.array(index, dtype=int)[chron]
tab['name'] = np.array(names, dtype=str)[chron]
tab['program'] = np.array(programs, dtype=str)[chron]
tab['start'] = start_array[chron]
tab['stop'] = Time(stops, format='isot')[chron]
tab['score'] = np.array(scores)[chron]
return tab
[docs]
def plot_range(self, start_time: Time, stop_time: Time, **kwargs) -> None:
""" Plots the current schedule.
.. rubric:: Data display keywords
:param start_time:
Minimal time to display.
:type start_time:
:class:`~astropy.time.Time`
:param stop_time:
Maximal time to display.
:type stop_time:
:class:`~astropy.time.Time`
.. rubric:: Plotting layout keywords
:param grid:
If set to ``True``, the time slots are separated by vertical lines.
Default is ``True``.
:type grid:
`bool`
:param figname:
Name of the file (absolute or relative path) to save the figure.
Default is ``''`` (i.e., only show the figure).
:type figname:
`str`
:param figsize:
Set the figure size.
Default is ``(15, 3)``.
:type figsize:
`tuple`
"""
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
# Initialize the figure
_, ax = plt.subplots(
figsize=kwargs.get("figsize", (15, 3))
)
# Display granularity
if kwargs.get("grid", True):
time_mask = (self._startsJD >= start_time.jd)*(self._stopsJD <= stop_time.jd)
for slot_start in self.starts[time_mask]:
ax.axvline(
slot_start.datetime,
color="gray",
linestyle="-",
linewidth=0.5
)
# Display unavailable slots
if self.reserved_blocks is not None:
for booked_block in self.reserved_blocks:
if not booked_block.is_within(start_time, stop_time):
continue
booked_block._display(ax=ax)
# Display observation blocks
if self.observation_blocks is not None:
for obs_block in self.observation_blocks:
if not obs_block.isBooked:
continue
if not obs_block.is_within(start_time, stop_time):
continue
obs_block._display(ax=ax)
ax.set_xlim(
left=start_time.datetime,
right=stop_time.datetime
)
# Formating
ax.yaxis.set_visible(False)
h_fmt = mdates.DateFormatter("%y-%m-%d\n%H")
ax.xaxis.set_major_formatter(h_fmt)
# Save or show the figure
figname = kwargs.get("figname", "")
if figname != "":
plt.savefig(
figname,
dpi=300,
bbox_inches="tight",
transparent=True
)
log.info(f"Figure '{figname}' saved.")
else:
plt.show()
plt.close("all")
[docs]
def plot(self, days_per_line=1, **kwargs):
""" Plots the current schedule.
.. rubric:: Data display keywords
:param days_per_line:
Number of days to plots per line.
Default is ``1``.
:type days_per_line:
`int`
.. rubric:: Plotting layout keywords
:param grid:
If set to ``True``, the time slots are separated by vertical lines.
Default is ``True``.
:type grid:
`bool`
:param figname:
Name of the file (absolute or relative path) to save the figure.
Default is ``''`` (i.e., only show the figure).
:type figname:
`str`
:param figsize:
Set the figure size.
Default is ``(15, 3*lines)``.
:type figsize:
`tuple`
"""
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
# Set relevant variables for the plot
time_min = Time(self.starts[0].isot.split('T')[0])
time_max = self.stops[-1]
# Find the number of sub-plots to display the schedule
if not isinstance(days_per_line, TimeDelta):
days_per_line = TimeDelta(
days_per_line,
format='jd'
)
n_subplots = int(np.ceil((time_max - time_min)/days_per_line))
# Initialize the figure
fig, axs = plt.subplots(
nrows=n_subplots,
ncols=1,
figsize=kwargs.get('figsize', (15, 3*n_subplots))
)
# Iterate over the sub-plots
try:
isIterable = iter(axs)
except:
axs = [axs]
for i, ax in enumerate(axs):
# Plot time limits
tiMin = time_min + i*days_per_line
tiMax = time_min + (i + 1)*days_per_line
ax.set_xlim(
left=tiMin.datetime,
right=tiMax.datetime
)
# Display granularity
if kwargs.get('grid', True):
tMask = (self._startsJD >= tiMin.jd) *\
(self._stopsJD <= tiMax.jd)
for slot_start in self.starts[tMask]:
ax.axvline(
slot_start.datetime,
color='gray',
linestyle='-',
linewidth=0.5
)
# Display unavailable slots
if self.reserved_blocks is not None:
for bookedBlock in self.reserved_blocks:
bookedBlock._display(ax=ax)
# Display observation blocks
if self.observation_blocks is not None:
for obsBlock in self.observation_blocks:
if not obsBlock.isBooked:
continue
obsBlock._display(ax=ax)
# Formating
ax.yaxis.set_visible(False)
h_fmt = mdates.DateFormatter('%y-%m-%d\n%H')
ax.xaxis.set_major_formatter(h_fmt)
# Save or show the figure
figname = kwargs.get('figname', '')
if figname != '':
plt.savefig(
figname,
dpi=300,
bbox_inches='tight',
transparent=True
)
log.info(f"Figure '{figname}' saved.")
else:
plt.show()
plt.close('all')
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _compute_block_constraint_scores(self, **kwargs):
"""
"""
log.info(
'Evaluating observation block constraints over '
'the schedule...'
)
self._cnstScores = np.zeros(
(self.observation_blocks.size, self.size)
)
self._maxScores = np.zeros(self.observation_blocks.size)
# Compute the constraint score over the schedule if it
# has not been previously been done
times = Time(
np.append(
self._startsJD,
self._startsJD[-1] + self.dt.jd
),
format='jd'
)
for i, blk in enumerate(self.observation_blocks):
if blk.constraints.score is None:
# If != 1, the constraint score has already been computed
# Append the last time slot to get the last slot top
blk.evaluate_score(time=times, sun_elevation=self.sun_elevation)
# Set other attributes relying on the schedule
blk.nSlots = int(np.ceil(blk.duration/self.dt))
# Get the constraint and maximum possible score
sliding_indices = np.arange(blk.nSlots)[None, :] +\
np.arange(self.size - blk.nSlots)[:, None]
self._cnstScores[i, :-blk.nSlots] = np.mean(
blk.constraints.score[sliding_indices],
axis=1
)
# Set constraint score to zero if forbidden index
forbidden_mask = np.any(~self.freeSlots[sliding_indices], axis=1)
self._cnstScores[i, :-blk.nSlots][forbidden_mask] *= 0
self._maxScores[i] = np.max(self._cnstScores[i, :])
# Check that the constraints are non zero
if self._maxScores[i] == 0:
self._toSchedule[i] = False
log.warning(
f"<ObsBlock> #{blk.blockIdx} '{blk.name}'"
" has a null score over the schedule."
)
log.info(
f'{self.observation_blocks.size} observation blocks have been '
'successfully evaluated.'
)
def _reset_observation_block_bookings(self) -> None:
""" Loops through all observation_blocks,
if they are booked, set them as brand new and
release the corresponding schedule booking slots.
"""
for block in self.observation_blocks:
if block.startIdx is None:
# The observation block has not been booked, skip
continue
# Free the schedule slots
self.freeSlots[block.startIdx:block.startIdx + block.nSlots] = True
if block.n_delay_slots != 0:
self.free_processing_slots[block.startIdx - block.n_delay_slots - 1: block.startIdx + block.nSlots + block.n_delay_slots] = True
# Reset the observation block as un-booked
block.startIdx = None
def _bound_to_schedule(self, indices: np.ndarray) -> np.ndarray:
"""
startIndices = (schedule_size)
return startIndices = (schedule_size)
"""
indices[indices < 0] = 0
beyond = (indices + self.observation_blocks.nSlots[self._toSchedule]) >= self.idxSlots.size
indices[beyond] = self.idxSlots.size - 1
indices -= beyond*self.observation_blocks.nSlots[self._toSchedule]
return indices
def _populate(self, n):
"""
"""
# return self._bounds(
# randGen.integers(
# low=0,
# high=self.idxSlots.size,
# size=(n, self.observation_blocks.size),
# dtype=np.int64
# )
# )
n_obs_blocks = np.sum(self._toSchedule)
# population = np.zeros(
# (n, self.observation_blocks.size),
# dtype=int
# )
population = np.zeros(
(n, n_obs_blocks),
dtype=int
)
for i in range(n_obs_blocks):
population[:, i] = randGen.choice(
np.where(self._cnstScores[self._toSchedule][i] != 0)[0],
# replace=False,
size=n,
)
return self._bound_to_schedule(population)
def _fitness(self, population):
"""
population : np.array((children, genome))
"""
# Evaluate the fitness of all the observation blocks over
# the cumulative (on nSlots) constraint scores and normalize
# by the maximum available score within the schedule
genome_fitness = np.diagonal(
self._cnstScores[self._toSchedule].T[population],
axis1=1,
axis2=2
)/self._maxScores[self._toSchedule]
#has_zeros = np.any(genome_fitness==0., axis=1)
fitness = np.mean(
genome_fitness,
axis=1
)
#fitness[has_zeros] = 0.
# Find out which individual in the population contains blocks
# that overlaps with one another.
# Sort the population block indices because we don't care
# about the last one
# sortedIdx = np.argsort(population)
# overlaps = np.any(
# np.diff(
# np.take_along_axis(
# population,
# sortedIdx,
# axis=1
# ),
# axis=1
# ) - self.observation_blocks.nSlots[self._toSchedule][sortedIdx][:, :-1] < 0,
# axis=1
# )
# # The fitness is the product of the constraint score and
# # is set to 0 whenever there are overlapping blocks.
# return fitness * ~overlaps
# Reduce the fitness if blocks with n_delay_slots != 0 overlap
are_delay_constrained = self.observation_blocks.n_delay_slots[self._toSchedule] > 0
population_delay = population[:, are_delay_constrained]
sorted_indices = np.argsort(population_delay)
delays = self.observation_blocks.n_delay_slots[self._toSchedule][sorted_indices][:, :-1]
nslots = self.observation_blocks.nSlots[self._toSchedule][sorted_indices][:, :-1]
overlaps_delay = np.sum(
np.diff(
np.take_along_axis(
population_delay,
sorted_indices,
axis=1
),
axis=1
) - (delays + nslots) < 0,
axis=1
)
# Reduce the fitness if observations temporally overlap
sortedIdx = np.argsort(population)
overlaps = np.sum(
np.diff(
np.take_along_axis(
population,
sortedIdx,
axis=1
),
axis=1
) - self.observation_blocks.nSlots[self._toSchedule][sortedIdx][:, :-1] < 0,
axis=1
)
# The fitness is the product of the constraint score and
# is reduced whenever there are overlapping blocks.
return fitness/(overlaps + overlaps_delay + 1)
def _mutation(self, genome):#, scaleOnFitness=False):
"""
"""
idx = randGen.integers(
low=0,
high=genome.size
)
genome[idx] = randGen.integers(
low=0,
high=self.idxSlots.size
)
return self._bound_to_schedule(genome)
# if scaleOnFitness:
# fitness = self._fitness(genome[None, :])[0]
# nMutations = int(np.ceil(genome.size * (1 - fitness)))
# idx = randGen.choice(
# np.arange(genome.size),
# replace=False,
# size=nMutations
# )
# genome[idx] = randGen.choice(
# self._freeIndices,
# size=nMutations
# )
# else:
# idx = randGen.integers(
# low=0,
# high=genome.size
# )
# genome[idx] = randGen.integers(
# low=0,
# high=self.idxSlots.size
# )
# return self._bounds(genome)
# ============================================================= #
# ============================================================= #