Source code for nenupy.schedule.open_time

#! /usr/bin/python3
# -*- coding: utf-8 -*-


"""
    *******************
    Open time functions
    *******************

    TODO
    - check that there is no overlap in a single KP's input
    - BUG wrong display event width when overlapping several other non-overlapping events
"""


__author__ = "Alan Loh"
__copyright__ = "Copyright 2024, nenupy"
__credits__ = ["Alan Loh"]
__maintainer__ = "Alan"
__email__ = "alan.loh@obspm.fr"
__status__ = "Production"
__all__ = [
    "XLSFile",
    "NenuCalendar"
]



import numpy as np
import re
import functools
from itertools import tee, islice, chain
import operator
import os
from typing import List, Dict, Tuple
from ics import Calendar, Event
import openpyxl
from openpyxl import Workbook
from openpyxl.worksheet.worksheet import Worksheet
from openpyxl.cell import Cell, MergedCell
from astropy.time import Time, TimeDelta
import calendar
import astropy.units as u
from astropy.coordinates import Angle
from astropy.table import Table
import datetime
import pytz

import matplotlib.dates as mdates
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.backends.backend_pdf as bkpdf

import logging
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)


SEASON = {
    "winter_start": "09-30",
    "summer_start": "03-31"
}
NIGHT_TIME = {
    "winter": (Angle([0, 4], unit="hourangle"), Angle([20, 24], unit="hourangle")), # local hours
    "summer": (Angle([0, 3], unit="hourangle"), Angle([19, 24], unit="hourangle"))
}


def _season_from_time(day: Time) -> str:
    year = day.yday.split(":")[0]

    winter_start = Time(f"{year}-{SEASON['winter_start']}")
    summer_start = Time(f"{year}-{SEASON['summer_start']}")

    if (day >= summer_start) and (day < winter_start):
        # We are in 'summer'
        season = "summer"
    else:
        # We are in 'winter'
        season = "winter"
    log.debug(f"{day.isot} is in <{season}> time.")
    return season

def _sort_night_time(start_hour: Angle, stop_hour: Angle, current_day: Time) -> Tuple[float, float]:
    
    season = _season_from_time(current_day)

    morning_night_boundaries, evening_night_boundaries = NIGHT_TIME[season]

    day_time = (stop_hour - start_hour).hour
    morning_hours = np.max((0, (morning_night_boundaries[1] - start_hour).hour))
    evening_hours = np.max((0, (stop_hour - evening_night_boundaries[0]).hour))
    night_time = morning_hours + evening_hours
    night_time += np.min((0, (day_time - night_time))) # if only morning or evening, correct the equation
    day_time -= night_time

    return day_time, night_time

# ============================================================= #
# -------------------------- XLSFile -------------------------- #
[docs] class XLSFile:
[docs] def __init__(self, filename: str): self.filename = filename self.data = {} # Load data self.workbook = openpyxl.load_workbook(filename) self.sheets = self._read_sheet() for sheet in self.sheets: sheet_name = sheet.title dates, month_data = self._parse_month(sheet) self.data[sheet_name] = {} self.data[sheet_name]["time"] = dates self.data[sheet_name]["data"] = month_data log.info(f"{filename} read and parsed.")
def to_ical(self) -> Dict[str, Calendar]: # Create a calendar per KP calendars = {} # Gather all data days = functools.reduce(operator.add, [self.data[month_key]["time"] for month_key in self.data.keys()]) data = np.vstack( [self.data[month_key]["data"] for month_key in self.data.keys()] ) # Fill calendars while looping through data for day, daily_data in zip(days, data): # Find cells that have been filled values = np.unique(daily_data[daily_data != None]) for value in values: fullmatch = re.search(pattern=r"^(?P<kp>(ES|SP|RP|LT)\S{2})\s+(?P<start>(\d+|\d+:\d+))\s*-\s*(?P<stop>(\d+|\d+:\d+))(?P<comment>\s+\(.+\))?(\s+)?$", string=value) if fullmatch is None: print(f"Problem parsing: '{value}'") # Add a new calendar if a different KP name is found if fullmatch["kp"] not in calendars: calendars[fullmatch["kp"]] = Calendar() start_time = day + datetime.timedelta(hours=Angle(fullmatch["start"], unit="hourangle").hour) stop_time = day + datetime.timedelta(hours=Angle(fullmatch["stop"], unit="hourangle").hour) # Add an event to the corresponding calendar calendars[fullmatch["kp"]].events.add( self._new_calendar_event( name=value, start=start_time, stop=stop_time, comment=fullmatch["comment"] ) ) for kp in calendars: # Group bookings before / after midnight # for event in sorted(calendars[kp].events): current_events, next_events = tee(sorted(calendars[kp].events), 2) next_events = chain(islice(next_events, 1, None), [None]) for event, next_event in zip(current_events, next_events): if next_event is None: # End of the file continue comment_search = re.search(pattern=r"\((?P<comment>.+)\)", string=event.name) comment = "" if comment_search is None else comment_search["comment"] next_comment_search = re.search(pattern=r"\((?P<comment>.+)\)", string=next_event.name) next_comment = "" if next_comment_search is None else next_comment_search["comment"] if (next_event.begin == event.end) and (next_comment == comment): # Merge the events and remove the old one next_event.begin = event.begin next_event.name = f"{event.name}*{next_event.name}" calendars[kp].events.remove(event) return calendars
[docs] def to_ics(self, save_path: str = "") -> None: # Assumes the file is perfect! """Convert the Excel document to an ICS calendar. The name of the ICS file will be ``<kp>.ics`` where ``<kp>`` is the Key project code found in the Excel file. Parameters ---------- save_path : str, optional Path where the ICS file will be saved, by default "" """ calendars = self.to_ical() # Write all calendars for kp in calendars: output = os.path.join(save_path, f"{kp}.ics") with open(output, "w") as wf: wf.writelines(calendars[kp].serialize_iter()) log.info(f"iCalendar {os.path.abspath(output)} written.")
def info(self, kp: str = "", strict: bool = False): hour_total = 0 day_total = 0 night_total = 0 errors = 0 n_values = 0 kp_list = [] for month_key in self.data.keys(): month_days = self.data[month_key]["time"] month_data = self.data[month_key]["data"] month_hours = 0 month_night = 0 month_day = 0 for day, day_data in zip(month_days, month_data): # Get the different cell values and their occurences hour_column = np.arange(0, 24) values, indices, counts = np.unique(day_data[day_data != None], return_counts=True, return_index=True) hour_indices = hour_column[day_data != None][indices] n_values += values.size for value, index, count in zip(values, hour_indices, counts): # Super strict check try: fullmatch = re.search(pattern=r"^(?P<kp>(ES|SP|RP|LT)\S{2})\s(?P<start>(\d+|\d+:\d+))-(?P<stop>(\d+|\d+:\d+))(?P<comment>\s\(.+\))?$", string=value) assert fullmatch is not None except AssertionError: if strict: log.error(f"(Month {month_key}, Day {day}) - Strict syntax check failed on '{value}'") errors += 1 continue else: pass # Read KP try: kp_match = re.match(pattern=r"^(ES|SP|RP|LT)\S{2}", string=value) # current_kp = value.split(" ")[0]#.replace("(", "").replace(")", "") current_kp = kp_match.group() except AttributeError: log.error(f"(Month {month_key}, Day {day}) - Impossible to read KP '{value}'") errors += 1 continue if not current_kp in kp_list: kp_list.append(current_kp) # Skip if not the selected kp if (kp != "") and (current_kp != kp): continue # Read the start and stop hours try: match = re.search(pattern=r"^(ES|SP|RP|LT)\S{2}\s+(?P<start>\S+)\s*-\s*(?P<stop>\S+)", string=value) start = match["start"] stop = match["stop"] # start, stop = value.split(" ")[1].split("-") except TypeError: log.error(f"(Month {month_key}, Day {day}) - Impossible to read start/stop times '{value}'") errors += 1 continue # Compute delta-time try: start_val = Angle(start, unit="hourangle") stop_val = Angle(stop, unit="hourangle") hours = (stop_val - start_val).hour except: log.error(f"(Month {month_key}, Day {day}) - not understanding start={start} stop={stop} values...") errors += 1 continue # Differentiate night from day time day_hours, night_hours = _sort_night_time( start_hour=start_val, stop_hour=stop_val, current_day=Time(day, format="datetime") ) # Check that the time duration makes sense # if hours < 0: # raise ValueError(f"(Month {month_key}, Day {day}) - Negative duration for '{value}'") # elif hours != (day_hours + night_hours): # raise ValueError(f"(Month {month_key}, Day {day}) - Problem parsing day/night hours...") # elif hours > 24: # raise ValueError(f"(Month {month_key}, Day {day}) - >24h duration for '{value}'") # Check that the start is at the correct position if int(start_val.hour) != index: errors += 1 log.error(f"(Month {month_key}, Day {day}) - Legend hour {start_val.hour} does not match column {index}h") # Check that the time duration corresponds to the number of merged cells if count != int(np.ceil(hours)): if ((stop_val % Angle(1 * u.h)) > Angle(1 * u.arcmin)) and (count == int(np.ceil(hours)) + 1): # We're good, the stop date doesn't fall on a round hour, so 1 additional cell is used pass else: errors += 1 log.error( f"(Month {month_key}, Day {day}) - {count} cells instead of {int(np.ceil(hours))} for value '{value}'" ) # Add the time to the total month_day += day_hours month_night += night_hours month_hours += hours log.info(f"Month {month_key}: {month_hours:.2f} hours (day = {month_day:.2f} hrs, night = {month_night:.2f} hrs)") hour_total += month_hours day_total += month_day night_total += month_night log.info(f"Total = {hour_total:.2f} hours (day = {day_total:.2f}, night = {night_total:.2f})") log.info(f"{errors=} / entries={n_values}") log.info(f"Key project(s) found: {kp_list}") def _read_sheet(self) -> List[Worksheet]: sheets = [] for sheet_name in self.workbook.sheetnames: # Skip the example sheet if sheet_name.lower() == "example": log.debug(f"'{sheet_name}' sheet skiped.") continue log.debug(f"Parsing '{sheet_name}' from '{self.filename}'") sheets.append(self.workbook[sheet_name]) if not len(sheets) == 6: raise ValueError(f"{len(sheets)} sheets were returned instead of 6 while reading {self.filename}.") return sheets def _parse_month(self, sheet: Worksheet) -> Tuple[list, np.ndarray]: # Loop over the rows of the sheet sheet_rows = [] dates = [] for row in sheet.iter_rows(min_row=3, min_col=1, max_col=24 + 1): # Only consider rows belonging to a month days current_day = row[0].value if not isinstance(current_day, datetime.datetime): break dates.append(current_day) sheet_rows.append(self._row_to_list(row[1:])) # Build the entire sheet and return a numpy array return dates, np.array(sheet_rows) @staticmethod def _row_to_list(row: Tuple[Cell]) -> list: # Find out if and where there are merged cells merged_mask = np.array([isinstance(cell, MergedCell) for cell in row], dtype=bool) # If there are merged cells, compute the indices of the first cells of merged cells (they are not of type MergedCell) merged_indices = merged_mask.nonzero()[0] try: first_cell_indices = np.array([group[0] for group in np.split(merged_indices, np.where(np.diff(merged_indices) != 1)[0]+1)]) - 1 except IndexError: first_cell_indices = np.array([]) # Loop over each cell in the row row_values = [] for i, cell in enumerate(row): if merged_mask[i]: # if the cell is merged, get the corresponding value from the list of first cell values previous_cell_index = first_cell_indices[first_cell_indices < i][-1] row_values.append( row[previous_cell_index].value ) else: row_values.append(cell.value) # Make sure that the 24 hours have been correctly read if len(row_values) != 24: raise ValueError(f"Row {row}: input different than 24 hours..") return row_values @staticmethod def _new_calendar_event(name: str, start: datetime.datetime, stop: datetime.datetime, comment: str = "") -> Event: event = Event() event.name = name event.begin = start event.end = stop event.transparent = False event.description = comment return event
# ============================================================= # # ------------------------- NenuEvent ------------------------- # SIDEREAL_DAY_TIMEDELTA = datetime.timedelta(hours=23, minutes=56, seconds=4.0905) # SIDEREAL_DAY_TIMEDELTA = datetime.timedelta(minutes=30, seconds=4.0905)
[docs] class NenuEvent:
[docs] def __init__(self, event: Event, kp_name: str, color: str): self.event = event self.color = color self.kp_name = kp_name self.draw_box = None self.draw_text = None self.selected = False
def __repr__(self) -> str: return f"<NenuEvent({self.event.begin.datetime} hello {self.event.end=} {self.kp_name=})>" def __str__(self) -> str: return f"<NenuEvent({self.event.begin.datetime} -- {self.event.end.datetime} KP='{self.kp_name}')>" def __lt__(self, other): return self.event.begin < other.event.end def __contains__(self, other) -> bool: """Checks whether self and other are sharing a common time interval. i.e. if DT_1 + DT_2 > max( |f_1 - i_2|, |f_2 - i_1| ) --> Parameters ---------- other : _type_ _description_ Returns ------- bool _description_ """ dt_1 = self.event.end.datetime - self.event.begin.datetime dt_2 = other.event.end.datetime - other.event.begin.datetime diff_1 = np.abs( self.event.end.datetime - other.event.begin.datetime ) diff_2 = np.abs( other.event.end.datetime - self.event.begin.datetime ) return dt_1 + dt_2 > np.max( (diff_1, diff_2) ) def contains(self, mouse_event) -> bool: bl, tl, _, br, _ = self.draw_box.get_xy() ex, ey = (mouse_event.xdata, mouse_event.ydata) return (ex >= bl[0]) & (ex <= br[0]) & (ey >= bl[1]) & (ey <= tl[1]) def connect_to_plot_events(self): if self.draw_box is None: raise Exception("Call draw() method first.") self.cid_click = self.draw_box.figure.canvas.mpl_connect( "button_press_event", self._on_click) self.cid_press = self.draw_box.figure.canvas.mpl_connect( "key_press_event", self._on_key_press) # self.cid_pick = self.draw_box.figure.canvas.mpl_connect( # "pick_event", self._on_pick) # self.cidpress = self.draw_box.figure.canvas.mpl_connect( # 'button_press_event', self.on_press) # self.cidrelease = self.draw_box.figure.canvas.mpl_connect( # 'button_release_event', self.on_release) # self.cidmotion = self.draw_box.figure.canvas.mpl_connect( # 'motion_notify_event', self.on_motion) def draw(self, ax: mpl.axes.Axes, fontsize: int = 6, text_rotation: int = 90, text_max_size: int = None): intersections_before, intersections_after = (0, 0)#self._n_intersections_event(event) TODO reactivate n_intersections = intersections_before + intersections_after + 1 y_width = 1 / n_intersections ymin = intersections_before * y_width ymax = ymin + y_width # Show the block rectangle self.draw_box = ax.axvspan( self.event.begin, self.event.end, ymin=ymin, ymax=ymax, facecolor=self.color, edgecolor="black", alpha=0.8,#0.5, zorder=40, # picker=True ) # Show the observation block title text = self.event.name if text_max_size is None: pass elif (text.lower() == "filler") or (text.lower() == "filer"): text = "LT02 Filler time" else: text = "\n".join(text[i:i+text_max_size] for i in range(0, len(text), text_max_size)) x_min, x_max = ax.get_xlim() text_pos = (self.event.begin + (self.event.end - self.event.begin)/2) text_pos_mdate = mdates.date2num(text_pos.datetime) if (x_min <= text_pos_mdate) & (text_pos_mdate < x_max): self.draw_text = ax.text( x=text_pos.datetime, y=ymin + y_width / 2, s=text,# + f"\n{intersections_before} -- {intersections_after}", horizontalalignment="center", verticalalignment="center", rotation=text_rotation, color="black", fontsize=fontsize, zorder=50 ) def _erase_drawing(self): current_figure = self.draw_box.figure print(self.event.name) self.draw_box.remove() # current_figure.canvas.draw_idle() def _shift_event_one_day_earlier(self) -> None: # one_day = datetime.timedelta(days=1) self.event.begin -= SIDEREAL_DAY_TIMEDELTA self.event.end -= SIDEREAL_DAY_TIMEDELTA vertices = self.draw_box.get_xy() vertices[np.array([0, 1, 4]), 0] = mdates.date2num(self.event.begin) vertices[np.array([2, 3]), 0] = mdates.date2num(self.event.end) self.draw_box.set_xy(vertices) self.draw_text.set_x(mdates.date2num(self.event.begin + (self.event.end - self.event.begin)/2)) def _shift_event_one_day_later(self) -> None: # one_day = datetime.timedelta(days=1) self.event.end += SIDEREAL_DAY_TIMEDELTA self.event.begin += SIDEREAL_DAY_TIMEDELTA vertices = self.draw_box.get_xy() vertices[np.array([0, 1, 4]), 0] = mdates.date2num(self.event.begin) vertices[np.array([2, 3]), 0] = mdates.date2num(self.event.end) self.draw_box.set_xy(vertices) self.draw_text.set_x(mdates.date2num(self.event.begin + (self.event.end - self.event.begin)/2)) def _on_click(self, event): # print(self.draw_box.get_xy()) # contains, attrd = self.draw_box.contains(event) # print(self.event.name, f"contains={contains}", f"{attrd}") if event.inaxes is None: return if not self.contains(event): if self.selected: # If it was previously selected, unselect self.selected = False self._change_facecolor(color=self.color) print(f"Un_selecting {self.event.name} !") return self.selected = True self._change_facecolor(color="gray") print(f"Selected {self.event.name} !") def _on_key_press(self, event): if (not self.selected) or (event.key not in ["up", "down"]): return current_ax = self.draw_box.axes if event.key == "up": print("go to past") self._shift_event_one_day_earlier() elif event.key == "down": print("go to future") self._shift_event_one_day_later() # self._erase_drawing() self._update_draw() def _change_facecolor(self, color) -> None: background = self.draw_box.figure.canvas.copy_from_bbox(self.draw_box.axes.bbox) self.draw_box.set(facecolor=color) self._update_draw(background) def _update_draw(self, background, ax: mpl.axes.Axes = None) -> None: if ax is None: ax = self.draw_box.axes figure = self.draw_box.figure figure.canvas.draw() figure.canvas.restore_region(background) figure.canvas.blit(ax.bbox)
#self.draw_box.figure.canvas.draw() # if ax is None: # ax = self.draw_box.axes #del self.draw_box #del self.draw_text # self.draw(ax=ax) # for ax in self.draw_box.figure.axes: # self.draw_box.figure.canvas.blit(ax.bbox) # self.draw_box.figure.canvas.draw() # self.draw_box.figure.canvas.flush_events() # plt.gcf().canvas.draw_idle() # for ax in self.draw_box.figure.axes: # ax.draw_idle() # self.draw_box.figure.canvas.draw_idle() # ============================================================= # # ----------------------- NenuCalendar ------------------------ #
[docs] class NenuCalendar:
[docs] def __init__(self, events: List[NenuEvent]): self.events = events
# def __init__(self, calendars: Dict[str, Calendar]): # self.kp_names = list(calendars.keys()) # self.calendars = calendars # self.kp_colors = mpl.cm.get_cmap("Spectral")(np.linspace(0, 1, len(self.kp_names))) @classmethod def from_ics(cls, *ics_files: str): calendars = {} for ics_file in ics_files: # Search the KP name ics_basename = os.path.basename(ics_file) match = re.search(pattern=r"(ES|SP|RP|LT)\S{2}", string=ics_basename) if match is None: raise ValueError(f"ICS file name {ics_basename} does not contain a KP name.") kp = match.group() # Read the ICS file with open(ics_file, "r") as rfile: ics_content = rfile.read() calendars[kp] = Calendar(ics_content) events = [] kp_colors = mpl.colormaps["Spectral"](np.linspace(0, 1, len(calendars.keys()))) for kp, kp_color in zip(calendars.keys(), kp_colors): if len(calendars.keys()) == 1: kp_color = "tab:blue" for event in calendars[kp].events: events.append(NenuEvent(event, kp_name=kp, color=kp_color)) return cls(events) @classmethod def from_csv(cls, *csv_files: str): calendars = {} for csv_file in csv_files: # Search the KP name csv_basename = os.path.basename(csv_file) match = re.search(pattern=r"(ES|SP|RP|LT)\S{2}", string=csv_basename) if match is None: raise ValueError(f"CSV file name {csv_basename} does not contain a KP name.") kp = match.group() if kp not in calendars: calendars[kp] = Calendar() # Read the CSV file csv_content = Table.read(csv_file) # Assumed first 3 columns are start, stop, comment start_col, stop_col, comment_col = csv_content.columns[:3] for i in range(len(csv_content)): event = Event() event.name = csv_content[i][comment_col] event.begin = Time(csv_content[i][start_col]).datetime event.end = Time(csv_content[i][stop_col]).datetime event.transparent = False event.description = "" calendars[kp].events.add(event) events = [] kp_colors = mpl.colormaps["Spectral"](np.linspace(0, 1, len(calendars.keys()))) for kp, kp_color in zip(calendars.keys(), kp_colors): if len(calendars.keys()) == 1: kp_color = "tab:blue" for event in calendars[kp].events: events.append(NenuEvent(event, kp_name=kp, color=kp_color)) return cls(events) @classmethod def from_xls(cls, *xls_files: str): kp_calendars = {} for xls_file in xls_files: # Search the KP name xls_basename = os.path.basename(xls_file) match = re.search(pattern=r"(ES|SP|RP|LT)\S{2}", string=xls_basename) if match is None: log.warning(f"ICS file name {xls_basename} does not contain a KP name.") kp = "none" else: kp = match.group() xls = XLSFile(filename=xls_file) kp_calendars = {**kp_calendars, **xls.to_ical()} # return cls(kp_calendars) events = [] kp_colors = mpl.colormaps["Spectral"](np.linspace(0, 1, len(kp_calendars.keys()))) for kp, kp_color in zip(kp_calendars.keys(), kp_colors): for event in kp_calendars[kp].events: events.append(NenuEvent(event, kp_name=kp, color=kp_color)) return cls(events) def info(self) -> None: # TBD: How much time is scheduled total_hours = 0 daily_hours = 0 night_hours = 0 for evt in self.events: start = evt.event.begin.datetime start_hour = start.hour + start.minute / 60 + start.second / 3600 stop = evt.event.end.datetime stop_hour = stop.hour + stop.minute / 60 + stop.second / 3600 if stop_hour <= start_hour: d_hours1, n_hours1 = _sort_night_time( start_hour=Angle(start_hour, unit="hourangle"), stop_hour=Angle(24, unit="hourangle"), current_day=Time(evt.event.begin.datetime, format="datetime") ) d_hours2, n_hours2 = _sort_night_time( start_hour=Angle(0, unit="hourangle"), stop_hour=Angle(stop_hour, unit="hourangle"), current_day=Time(evt.event.begin.datetime, format="datetime") ) d_hours = d_hours1 + d_hours2 n_hours = n_hours1 + n_hours2 else: start_val = Angle(start_hour, unit="hourangle") stop_val = Angle(stop_hour, unit="hourangle") # hours = (stop_val - start_val).hour d_hours, n_hours = _sort_night_time( start_hour=start_val, stop_hour=stop_val, current_day=Time(evt.event.begin.datetime, format="datetime") ) daily_hours += d_hours night_hours += n_hours td = evt.event.end.datetime - evt.event.begin.datetime evt_hours = td.days * 24 + td.seconds / 3600 if not np.isclose(d_hours + n_hours, evt_hours, rtol=1e-10, atol=1/60): # 1 min tolerance log.warning(f"{evt} mismatch {evt_hours} vs. {d_hours=} {n_hours=}...") total_hours += evt_hours log.info(f"total={total_hours} hours (day={daily_hours}, night={night_hours})") # log.info(f"total={total_hours} hours")# (day={daily_hours}, night={night_hours})")
[docs] def write_summary_csv(self, path: str = "", filename: str = "") -> None: """Write a CSV file summarizing the events. Parameters ---------- path : str, optional The output path to store the summary, by default "" filename : str, optional The name of the output CSV (should end with .csv), no path name, by default "" (i.e. 'calendar_summary.csv') """ sorted_events = sorted(self.events) if filename != "": assert filename.endswith(".csv"), f"Filename {filename} should end with .csv" filename = os.path.join(path, os.path.basename(filename)) else: filename = os.path.join(path, "calendar_summary.csv") with open(filename, "w") as wfile: wfile.write(f"start_iso,stop_iso,kp_code,name\n") for event in sorted_events: event_name = event.event.name # Extract the KP name kp_match = re.search(pattern=r"(ES|SP|RP|LT)\S{2}", string=event_name) if kp_match is None: raise Exception(f"Unable to read KP name from '{event_name}'") kp = kp_match.group() if kp != event.kp_name: raise Exception(f"Inconsistency between kp_name {event.kp_name} and kp written {kp}") start = Time(event.event.begin.datetime, format="datetime") start.precision = 0 stop = Time(event.event.end.datetime, format="datetime") stop.precision = 0 # Write the CSV file wfile.write(f"{start.iso},{stop.iso},{kp},{event_name}\n") log.info(f"{filename} written.")
[docs] def write_vcr_csv(self, path: str = "", add_error: bool = True) -> None: """Write a CSV file from the events loaded in this NenuCalendar instance. The CSV format is such that it can be imported to book KP slots in the VCR. Parameters ---------- path : str, optional The output path to store the booking file, by default "" add_error : bool, optional Duplicater the last line so that it results in an error (i.e., to check that this is the last remaining error while importing the data to the VCR), by default True """ sorted_events = sorted(self.events) current_events, next_events = tee(sorted_events, 2) next_events = chain(islice(next_events, 1, None), [None]) yyyy_mm_start = sorted_events[0].event.begin.datetime.strftime("%Y_%m") yyyy_mm_stop = sorted_events[-1].event.end.datetime.strftime("%Y_%m") filename = os.path.join(path, f"Booking_{yyyy_mm_start}_{yyyy_mm_stop}.csv") log.info(f"Writing {filename}...") with open(filename, "w") as wfile: for event, next_event in zip(current_events, next_events): start = Time(event.event.begin.datetime, format="datetime") start.precision = 0 stop = Time(event.event.end.datetime, format="datetime") if not (next_event is None): # Not end of the loop # Check that there is no overlap! if event in next_event: raise Exception(f"There is an overlap between {event} and {next_event}!") # Gather events belonging to the same KP that are consectutive in time # This will modify the stop time while (event.kp_name == next_event.kp_name) and (event.event.end == next_event.event.begin): # Check that there is no overlap! if event in next_event: raise Exception(f"There is an overlap between {event} and {next_event}!") if next_event is None: # End of the loop break log.info(f"Merging {event} and {next_event}") stop = Time(next_event.event.end.datetime, format="datetime") event = next(current_events) next_event = next(next_events) stop.precision = 0 # TODO Remove the conditions next time... event_name = "LT02 filler" if (event.event.name.lower() == "filler" or event.event.name.lower() == "filer") else event.event.name if "LT00" in event.event.name: event_name = event_name.replace("LT00", "ES00") if "RP5A" in event.event.name: event_name = event_name.replace("RP5A", "RP3A") # Extract the KP name kp_match = re.search(pattern=r"(ES|SP|RP|LT)\S{2}", string=event_name) if kp_match is None: raise Exception(f"Unable to read KP name from '{event_name}'") kp = kp_match.group() if kp != event.kp_name: raise Exception(f"Inconsistency between kp_name {event.kp_name} and kp written {kp}") # Write the CSV file wfile.write(f"{start.iso},{stop.iso},{kp},NenuFAR booking\n") if add_error: # Duplicate last line to generate an error wfile.write(f"{start.iso},{stop.iso},{kp},NenuFAR booking\n")
def month_plot(self, fig_name: str): if not fig_name.endswith(".pdf"): raise ValueError("fig_name must end with .pdf") pdf_document = bkpdf.PdfPages(fig_name) utc = pytz.UTC # Sort events by month monthly_events = {} for evt in self.events: begin_month_str = evt.event.begin.datetime.strftime("%B") begin_year = evt.event.begin.datetime.year end_month_str = evt.event.end.datetime.strftime("%B") end_year = evt.event.end.datetime.year if begin_year not in monthly_events: monthly_events[begin_year] = {} if begin_month_str not in monthly_events[begin_year]: monthly_events[begin_year][begin_month_str] = [] monthly_events[begin_year][begin_month_str].append(evt) if begin_month_str != end_month_str: if end_year not in monthly_events: monthly_events[end_year] = {} if end_month_str not in monthly_events[end_year]: monthly_events[end_year][end_month_str] = [] monthly_events[end_year][end_month_str].append(evt) # For each month, make a plot of day vs UTC for year in sorted(list(monthly_events.keys())): for month_i in range(12): month = calendar.month_name[month_i + 1] if month not in monthly_events[year]: continue log.info(f"Plotting {month}...") # Find out how many days are in the specific month month_number = list(calendar.month_name).index(month) _, days_in_month = calendar.monthrange(year, month_number) fig, axs = plt.subplots( nrows=days_in_month, ncols=1, figsize=(20, 0.4*days_in_month), ) fig.subplots_adjust(hspace=0, top=0.95) # fig.suptitle(f"{month} {year}") st = fig.suptitle(f"{month} {year}") # fig.autofmt_xdate() time_min = Time(f"{year}-{month_number:02d}-01T00:00:00") for i, ax in enumerate(axs): # Plot time limits ax_time_min = time_min + i * TimeDelta(1, format="jd") ax_time_max = time_min + (i + 1) * TimeDelta(1, format="jd") ax.set_xlim( left=ax_time_min.datetime, right=ax_time_max.datetime ) hours = mdates.HourLocator(interval = 1) h_fmt = mdates.DateFormatter("%H:%M") ax.xaxis.set_major_locator(hours) ax.xaxis.set_major_formatter(h_fmt) ax.grid(axis="x", color="0.9") ax.set_yticklabels([]) ax.set_ylabel(f"{i + 1:02d}") if i != days_in_month - 1: ax.set_xticklabels([]) # elif i == 0: # ax.set_title(f"{month} {year}") for evt in monthly_events[year][month]: if not self._time_intersection( time1=(evt.event.begin, evt.event.end), time2=(ax_time_min.datetime.replace(tzinfo=utc), ax_time_max.datetime.replace(tzinfo=utc)) ): # Event outside of plot range continue # self._plot_event(ax=ax, event=event, color=kp_color) evt.draw(ax=ax, fontsize=6, text_rotation=0, text_max_size=16) pdf_document.savefig(fig, bbox_inches="tight", bbox_extra_artists=[st], dpi=100) plt.close() # break pdf_document.close() # def plot(self, tmin: Time = None, tmax: Time = None, **kwargs): # utc_frame = pytz.UTC # time_min = Time(tmin.isot.split("T")[0]) # time_max = tmax # n_subplots = int(np.ceil((tmax - tmin) / TimeDelta(1, format="jd"))) # fig, axs = plt.subplots( # nrows=n_subplots, # ncols=1, # figsize=kwargs.get("figsize", (8, 3*n_subplots)) # ) # drawn_events = [] # def on_click(event): # # Figure out the one # nonlocal drawn_events # for evt in drawn_events: # if evt.contains(event): # print("yeah", evt.event.name) # # print('%s click: button=%d, x=%d, y=%d, xdata=%f, ydata=%f' % # # ('double' if event.dblclick else 'single', event.button, # # event.x, event.y, event.xdata, event.ydata)) # # selected_artist = None # # previous_color = None # # def on_key_press(event): # # nonlocal selected_artist # # if selected_artist is None: # # return # # elif event.key == "up": # # print("go to past") # # # self._shift_event_one_day_earlier(event) # # elif event.key == "down": # # print("go to future") # # # self._shift_event_one_day_later(event) # # else: # # return # # def unselect_block_event(): # # nonlocal selected_artist # # nonlocal previous_color # # if selected_artist is None: return # # selected_artist.set(facecolor=previous_color) # # selected_artist = None # # def select_block_event(artist): # # nonlocal selected_artist # # nonlocal previous_color # # selected_artist = artist # # previous_color = selected_artist.get_facecolor() # # print(selected_artist.__dir__()) # # # find the corresponding event # # selected_artist.set(facecolor="tab:red") # # def onpick(event): # # if event.mouseevent.button != 1: return # # print(event) # # event_block = event.artist # # unselect_block_event() # # select_block_event(event_block) # # fig.canvas.draw()#plt.draw() #redraw # # cid = fig.canvas.mpl_connect("button_press_event", on_click) # # fig.canvas.mpl_connect("pick_event", onpick) # # fig.canvas.mpl_connect("key_press_event", on_key_press) # for i, ax in enumerate(axs): # # Plot time limits # ax_time_min = time_min + i * TimeDelta(1, format="jd") # ax_time_max = time_min + (i + 1) * TimeDelta(1, format="jd") # ax.set_xlim( # left=ax_time_min.datetime, # right=ax_time_max.datetime # ) # # for kp, kp_color in zip(self.kp_names, self.kp_colors): # #for event in self.calendars[kp].events: # for evt in self.events: # if not self._time_intersection( # time1=(evt.event.begin, evt.event.end), # # time2=(ax_time_min.datetime.replace(tzinfo=utc_frame), ax_time_max.datetime.replace(tzinfo=utc_frame)) # time2=(time_min.datetime.replace(tzinfo=utc_frame), time_max.datetime.replace(tzinfo=utc_frame)) # ): # # Event outside of plot range # continue # # self._plot_event(ax=ax, event=event, color=kp_color) # evt.draw(ax=ax) # evt.connect_to_plot_events() # #drawn_events.append(evt) # plt.tight_layout() # plt.show() # # plt.close("all") @staticmethod def _time_intersection(time1: Tuple[datetime.datetime, datetime.datetime], time2: Tuple[datetime.datetime, datetime.datetime]) -> bool: """Compares two time intervals (each made of a length-2 tuple) and returns a boolean if the two intervals intersect with each other. The test is performed by comparing the total duration of the time intervals with the duration of the interval made of the earliest start and the latest stop. If the total duration is shorted than the latter, it means that the intervals are overlapping. Parameters ---------- time1 : Tuple[datetime.datetime, datetime.datetime] (start, stop) time2 : Tuple[datetime.datetime, datetime.datetime] (start, stop) Returns ------- `bool` Whether the time intervals ovrlap with each other. """ duration1 = time1[1] - time1[0] duration2 = time2[1] - time2[0] time_min = min(time1[0], time2[0]) time_max = max(time1[1], time2[1]) greatest_duration = time_max - time_min if duration1 + duration2 > greatest_duration: return True # time interval instersect each other else: return False def _n_intersections_event(self, event: Event) -> Tuple[int, int]: n_before = 0 n_after = 0 event_crossed = False for kp in self.kp_names: for other_event in self.calendars[kp].events: # if (other_event.begin >= event.begin) & (other_event.end <= event.end): # # other embedded within event # n += 1 # elif (event.begin >= other_event.begin) & (event.end <= other_event.end): # # event embedded within other # n += 1 # elif (event.begin >= other_event.begin) & (event.begin <= other_event.end): # # other overlaping start of event # n += 1 # elif (event.end >= other_event.begin) & (event.end <= other_event.end): # # other overlaping end of event # n += 1 if other_event == event: event_crossed = True elif self._time_intersection( time1=(event.begin, event.end), time2=(other_event.begin, other_event.end) ): if event_crossed: n_after += 1 else: n_before += 1 return n_before, n_after
# def _plot_event(self, ax: mpl.axes.Axes, event: Event, color) -> None: # intersections_before, intersections_after = self._n_intersections_event(event) # n_intersections = intersections_before + intersections_after + 1 # y_width = 1 / n_intersections # ymin = intersections_before * y_width # ymax = ymin + y_width # # Show the block rectangle # ax.axvspan( # event.begin, # event.end, # ymin=ymin, # ymax=ymax, # facecolor=color, # edgecolor="black", # alpha=0.5, # picker=True # ) # # Show the observation block title # x_min, x_max = ax.get_xlim() # text_pos = (event.begin + (event.end - event.begin)/2) # text_pos_mdate = mdates.date2num(text_pos.datetime) # if (x_min <= text_pos_mdate) & (text_pos_mdate < x_max): # ax.text( # x=text_pos.datetime, # y=ymin + y_width / 2, # s=event.name + f"\n{intersections_before} -- {intersections_after}", # horizontalalignment="center", # verticalalignment="center", # rotation=90, # color="black", # fontsize=8 # ) # ============================================================= # # ---------------------- workbook_to_ics ---------------------- # # def workbook_to_ics() # ============================================================= # # ----------------------- excel_to_ics ------------------------ #
[docs] def excel_to_ics(filename: str, output: str = None) -> None: # Checking input / output excel_extension = ".xlsx" calendar_extension = ".ics" if not filename.endswith(excel_extension): raise ValueError(f"{filename} does not end with '{excel_extension}'.") if output is None: output = filename.replace(excel_extension, calendar_extension) elif not output.endswith(calendar_extension): raise ValueError(f"{output} does not end with '{calendar_extension}'.") else: pass # Making the calendar calendar = Calendar()
# tz = 'Europe/Paris' # first_day = arrow.get("2022-02-14").replace(tzinfo=tz) # last_day = arrow.get("2022-02-18").replace(tzinfo=tz) # for day in arrow.Arrow.range('day', first_day, last_day): # event = Event() # event.name = "Working on the task" # event.begin = day.replace(hour=8).to('utc').datetime # event.end = day.replace(hour=10).to('utc').datetime # event.transparent = False # calendar.events.add(event) # event = Event() # event.name = "Continue on the task?" # event.begin = day.replace(hour=10).to('utc').datetime # event.end = day.replace(hour=11).to('utc').datetime # event.transparent = True # calendar.events.add(event) # with open(output, "w") as wf: # wf.writelines(calendar.serialize_iter())