#! /usr/bin/python3
# -*- coding: utf-8 -*-
"""
*************
Parset reader
*************
"""
__author__ = 'Alan Loh'
__copyright__ = 'Copyright 2020, nenupy'
__credits__ = ['Alan Loh']
__maintainer__ = 'Alan'
__email__ = 'alan.loh@obspm.fr'
__status__ = 'Production'
__all__ = [
'_ParsetProperty',
'Parset',
'ParsetUser'
]
from os.path import abspath, isfile, join, basename, dirname
from collections.abc import MutableMapping
from copy import deepcopy
from typing import Tuple, Callable
import re
import json
from astropy.time import Time, TimeDelta
from astropy.coordinates import SkyCoord, AltAz, ICRS
import astropy.units as u
# from ipywidgets.widgets.widget_output import Output
import numpy as np
from nenupy import nenufar_position
from nenupy.instru import sb2freq
from nenupy.astro.target import SolarSystemTarget
from nenupy.observation import PARSET_OPTIONS
from nenupy.observation.sqldatabase import DuplicateParsetEntry, UserNameNotFound
import logging
log = logging.getLogger(__name__)
SB_WIDTH = 195.3125*u.kHz
# ============================================================= #
# ---------------------- _ParsetProperty ---------------------- #
# ============================================================= #
class _ParsetProperty(MutableMapping):
""" Class which mimics a dictionnary object, adapted to
store parset metadata per category. It understands the
different data types from raw strings it can encounter.
"""
def __init__(self, data=()):
self.mapping = {}
self.update(data)
def __getitem__(self, key):
return self.mapping[key]
def __delitem__(self, key):
del self.mapping[key]
def __setitem__(self, key, value):
"""
"""
value = value.replace('\n', '')
value = value.replace('"', '')
if value.startswith('[') and value.endswith(']'):
# This is a list
val = value[1:-1].split(',')
value = []
# Parse according to syntax
for i in range(len(val)):
if '..' in val[i]:
# This is a subband syntax
subBandSart, subBanStop = val[i].split('..')
value.extend(
list(
range(
int(subBandSart),
int(subBanStop) + 1
)
)
)
elif ':' in val[i]:
# Might be a time object
try:
item = Time(val[i].strip(), precision=0)
except ValueError:
item = val[i]
value.append(item)
elif val[i].isdigit():
# Integers (there are not list of floats)
value.append(int(val[i]))
else:
# A simple string
value.append(val[i])
elif value.lower() in ['on', 'enable', 'true']:
# This is a 'True' boolean
value = True
elif value.lower() in ['off', 'disable', 'false']:
# This is a 'False' boolean
value = False
elif 'angle' in key.lower():
# This is a float angle in degrees
value = float(value) * u.deg
elif value.isdigit():
value = int(value)
elif ':' in value:
# Might be a time object
try:
value = Time(value.strip(), precision=0)
except ValueError:
pass
else:
pass
# if key in self:
# del self[self[key]]
self.mapping[key] = value
def __iter__(self):
return iter(self.mapping)
def __len__(self):
return len(self.mapping)
def __repr__(self):
return f'{type(self).__name__}({self.mapping})'
# ============================================================= #
# ============================================================= #
# ============================================================= #
# ------------------------ _JsonEntry ------------------------- #
# ============================================================= #
def _parse_parameters(parameters: str, pulsar: bool = False) -> Tuple[str, dict]:
""" Parse values from the digital beam 'parameters'
entry.
E.g. 'TF: DF=3.05 DT=10.0 HAMM'
"""
parameters = parameters.lower()
mode = parameters.split(':')[0]
if pulsar:
configs = {
param.split('=')[0]: param.split('=')[1]\
for param in parameters.split('--')\
if '=' in param
}
configs.update({
param.rstrip(): True\
for param in parameters.split('--')\
if '=' not in param
})
else:
configs = {
param.split('=')[0]: param.split('=')[1]\
for param in parameters.split()\
if '=' in param
}
configs.update({
param.rstrip(): True\
for param in parameters.split('--')\
if '=' not in param
})
return mode, configs
def _array_to_dict_array(array: list, unit: str = "") -> list:
""" """
if unit != "":
return [
{"value": val, "unit": unit} for val in array
]
else:
return [
{"value": val} for val in array
]
def _get_pointing_center_dict(property: _ParsetProperty) -> dict:
""" Returns a RA, Dec whatever the pointing type is. """
def _constrain_angle(
angle: u.Quantity,
valmin: u.Quantity = 0.*u.deg,
valmax: u.Quantity = 90*u.deg
):
""" Constrain an angle between two values. """
if angle < valmin:
angle = valmin
elif angle > valmax:
angle = valmax
else:
pass
return angle
# Sort out the beam start and stop times
duration = TimeDelta(property['duration'] , format='sec')
start_time = property['startTime']
stop_time = (property['startTime'] + duration)
if "azelFile" in property:
# In case of pointing described by an azelfile
# it will be treated as a zenith pointing (wrong but best compromise for the database)
property["directionType"] = "azelgeo_azelfile"
# Deal with coordinates and pointing types
direction_type = property['directionType'].lower()
if direction_type == "j2000":
ra = property['angle1'].to(u.deg)
dec = property['angle2'].to(u.deg)
if ("decal_az" in property) or ("decal_el" in property):
altaz = SkyCoord(ra, dec).transform_to(
AltAz(
obstime=start_time + duration/2.,
location=nenufar_position
)
)
radec = SkyCoord(
_constrain_angle(
altaz.az + float(property.get("decal_az", 0.0))*u.deg,
valmin=0.*u.deg,
valmax=360.*u.deg
),
_constrain_angle(
altaz.alt + float(property.get("decal_el", 0.0))*u.deg,
valmin=0.*u.deg,
valmax=90.*u.deg
),
frame=AltAz(
obstime=start_time + duration/2.,
location=nenufar_position
)
).transform_to(ICRS)
ra = radec.ra
dec = radec.dec
# Nothing else to do
decal_ra = float(property.get("decal_ra", 0.0))*u.deg
decal_dec = float(property.get("decal_dec", 0.0))*u.deg
right_ascension = _constrain_angle(
(ra + decal_ra).value,
valmin=0.,
valmax=360.
)
declination = _constrain_angle(
(dec + decal_dec).value,
valmin=-90.,
valmax=90.
)
elif direction_type == "azelgeo":
# This is a transit observation, compute the mean RA/Dec
# Convert AltAz to RA/Dec
radec = SkyCoord(
_constrain_angle(
property['angle1'] + float(property.get("decal_az", 0.0))*u.deg,
valmin=0.*u.deg,
valmax=360.*u.deg
),
_constrain_angle(
property['angle2'] + float(property.get("decal_el", 0.0))*u.deg,
valmin=0.*u.deg,
valmax=90.*u.deg
),
frame=AltAz(
obstime=start_time + duration/2.,
location=nenufar_position
)
).transform_to(ICRS)
right_ascension = _constrain_angle(
radec.ra.deg + float(property.get("decal_ra", 0.0)),
valmin=0.,
valmax=360.
)
declination = _constrain_angle(
radec.dec.deg + float(property.get("decal_dec", 0.0)),
valmin=-90.,
valmax=90.
)
elif direction_type == "azelgeo_azelfile":
# This observation was made using an azelfile
radec = SkyCoord(
0.*u.deg,
90*u.deg,
frame=AltAz(
obstime=start_time + duration/2.,
location=nenufar_position
)
).transform_to(ICRS)
right_ascension = radec.ra.deg
declination = radec.dec.deg
elif direction_type == "natif":
# This is a test observation, unable to parse the RA/Dec
right_ascension = None
declination = None
else:
# Dealing with a Solar System source
solar_system_target = SolarSystemTarget.from_name(
name=direction_type,
time=start_time + duration/2.
)
radec = solar_system_target.coordinates
if ("decal_az" in property) or ("decal_el" in property):
altaz = solar_system_target.horizontal_coordinates[0]
radec = SkyCoord(
_constrain_angle(
altaz.az + float(property.get("decal_az", 0.0))*u.deg,
valmin=0.*u.deg,
valmax=360.*u.deg
),
_constrain_angle(
altaz.alt + float(property.get("decal_el", 0.0))*u.deg,
valmin=0.*u.deg,
valmax=90.*u.deg
),
frame=AltAz(
obstime=start_time + duration/2.,
location=nenufar_position
)
).transform_to(ICRS)
decal_ra = float(property.get("decal_ra", 0.0))*u.deg
decal_dec = float(property.get("decal_dec", 0.0))*u.deg
right_ascension = _constrain_angle(
radec.ra.deg + decal_ra.value,
valmin=0.,
valmax=360.
)
declination = _constrain_angle(
radec.dec.deg + decal_dec.value,
valmin=-90.,
valmax=90.
)
return {
"ra": {
"value": right_ascension,
"unit": "deg"
},
"dec": {
"value": declination,
"unit": "deg"
},
"obs_direction_type": property["directionType"].lower()
}
def _get_time_dict(property: _ParsetProperty) -> dict:
""" """
# Sort out the beam start and stop times
duration = TimeDelta(property['duration'] , format='sec')
start_time = property['startTime']
stop_time = (property['startTime'] + duration)
return {
"startstop":
{
"gte": start_time.isot,
"lte": stop_time.isot
},
"duration": {
"value": np.round(duration.sec, 3),
"unit": "s"
}
}
def _get_frequency_dict(property: _ParsetProperty, field: str = "subbandList") -> dict:
""" """
subband_list = property[field]
# Find consecutive subbands groups:
subband_list_groups = np.split(
subband_list,
np.where(np.diff(subband_list) != 1)[0] + 1
)
return [
{
"value": {
"gte": sb2freq(group.min())[0].to(u.MHz).value,
"lt": (sb2freq(group.max()) + SB_WIDTH)[0].to(u.MHz).value,
},
"unit": "MHz"
} for group in subband_list_groups
]
def _default_setting(digibeam: _ParsetProperty, output: _ParsetProperty, version: tuple) -> dict:
return {
"name": "LaNewBa",
"dt": {
"value": 1,
"unit": "s"
},
"df": {
"value": SB_WIDTH.to(u.kHz).value,
"unit": "kHz"
},
"frequency": _get_frequency_dict(digibeam, field="subbandList")
}
def _pulsar_setting(digibeam: _ParsetProperty, output: _ParsetProperty, version: tuple) -> dict:
# Parse the parameters
try:
mode, config = _parse_parameters(digibeam["parameters"], pulsar=True)
except KeyError:
log.warning(
f"No 'parameters' for numerical beam {digibeam['noBeam']})."
)
return {}
# Fill out the receiver configuration depending on the observing mode
if mode == "fold":
return {
"name": "undysputed",
"mode": "pulsar_fold",
"source_name": config["src"].rstrip(),
"n_polars": 1 if config.get("onlyi", False) else 4,
"frequency": _get_frequency_dict(digibeam, field="subbandList")
}
elif mode == "single":
return {
"name": "undysputed",
"mode": "pulsar_single",
"source_name": config["src"].rstrip(),
"downsampling": int(config["dstime"]),
"n_polars": 1 if config.get("onlyi", False) else 4,
"frequency": _get_frequency_dict(digibeam, field="subbandList")
}
elif mode == "waveolaf":
return {
"name": "undysputed",
"mode": "pulsar_waveolaf",
"source_name": config["src"].rstrip(),
"frequency": _get_frequency_dict(digibeam, field="subbandList")
}
elif mode == "wave":
return {
"name": "undysputed",
"mode": "pulsar_wave",
"source_name": config["src"].rstrip(),
"frequency": _get_frequency_dict(digibeam, field="subbandList")
}
else:
log.warning(f"Pulsar mode '{mode}' not recognized.")
return {}
def _waveform_setting(digibeam: _ParsetProperty, output: _ParsetProperty, version: tuple) -> dict:
return {
"name": "undysputed",
"mode": "waveform",
"frequency": _get_frequency_dict(digibeam, field="subbandList")
}
def _dynamicspectrum_setting(digibeam: _ParsetProperty, output: _ParsetProperty, version: tuple) -> dict:
# Parse the parameters
try:
_, config = _parse_parameters(digibeam["parameters"], pulsar=False)
except KeyError:
log.warning(
f"No 'parameters' for numerical beam {digibeam['noBeam']}). Setting to default values."
)
# Set default value for the configuration
config = {
"dt": 5.00,
"df": 6.1
}
try:
if config.get("tf: rawrt", False):
# This shouldnt be the case though...
return _waveform_setting(digibeam, output, version)
return {
"name": "undysputed",
"mode": "tf",
"dt": {
"value": float(config["dt"]),
"unit": "ms"
},
"df": {
"value": float(config["df"]),
"unit": "kHz"
},
"frequency": _get_frequency_dict(digibeam, field="subbandList")
}
except KeyError:
log.warning(
f"Wrong '{digibeam['toDo']}' configuration: {config}."
)
return {}
def _nickel_setting(phasecenter: _ParsetProperty, output: _ParsetProperty, version: tuple) -> dict:
nickel_config = {
"name": "nickel",
"channelization": {
"value": output["nri_channelization"],
"unit": None
},
"dumptime": {
"value": output["nri_dumpTime"],
"unit": "s"
}
}
if version >= (1, 0):
# Parse the parameters
try:
mode, config = _parse_parameters(phasecenter["parameters"], pulsar=False)
log.warning("NICKEL parameters not taken into account. Needs to be implemented!")
except KeyError:
log.warning(
f"No 'parameters' for phase center {phasecenter['noBeam']})."
)
#return {}
nickel_config["frequency"] = _get_frequency_dict(phasecenter, "subbandList")
return nickel_config
else:
nickel_config["frequency"] = _get_frequency_dict(output, "nri_subbandList")
return nickel_config
def _tbd_setting(digibeam: _ParsetProperty, output: _ParsetProperty, version: tuple) -> dict:
if ("nickel" in output.get("nri_receivers", [])) and (version < (1, 0)):
return _nickel_setting(digibeam, output, version)
else:
return _default_setting(digibeam, output, version)
BEAM_SETTINGS = {
"none": _default_setting,
"pulsar": _pulsar_setting,
"waveform": _waveform_setting,
"dynamicspectrum": _dynamicspectrum_setting,
"tbd": _tbd_setting,
"nickel": _nickel_setting
}
class _JsonEntry:
def __init__(self, output: _ParsetProperty):
self.obs_metadata = {}
self.output = output
self.fovs = []
self.pointings = []
@property
def fov_indices(self) -> np.ndarray:
return np.array([fov["idx"] for fov in self.fovs])
@property
def data(self) -> dict:
""" """
# Fill the Field of Views with their associated pointings
fovs = self.fovs.copy()
for fov_idx, pointing in self.pointings:
fovs[fov_idx]["pointings"].append(pointing)
# Build the dictionnary of field of views
fov_dict = {
"field_of_views": fovs
}
# Return a dictionnary that can be transformed to JSON
return {**self.obs_metadata, **fov_dict}
def add_observation_metadata(self, observation: _ParsetProperty, parset_file: str, parset_user: str = "") -> None:
""" """
self.obs_metadata["@timestamp"] = observation["startTime"].isot
# Fill out observation tab
self.obs_metadata["file_name"] = {
"name": basename(parset_file),
"path": dirname(parset_file)
}
self.obs_metadata["time"] = {
"startstop":
{
"gte": observation["startTime"].isot,
"lt": observation["stopTime"].isot
},
"duration": {
"value": np.round((observation["stopTime"] - observation["startTime"]).sec, 3),
"unit": "s"
}
}
topic = observation.get('topic', 'LT00 DEBUG')
# Try to capture code and topic name using regular expression
pattern = r'^(?P<code>(ES([0]?[0-9]|1[0-7])|LT(?!08)([0]?[0-9]|1[0-3])|RP1[A-C]|SP(16|17))) (?P<name>\w+)$'
topic_decoded = re.match(pattern=pattern, string=topic)
if topic_decoded is None:
log.warning(f'{topic} not properly decoded!!')
self.obs_metadata["topic"] = {
'code': 'LT00',
'name': 'DEBUG'
}
else:
topic_decoded_dict = topic_decoded.groupdict()
self.obs_metadata["topic"] = {
'code': topic_decoded_dict['code'],
'name': topic_decoded_dict['name']
}
key_mapping = {
"title": "title",
"contactName": "contact_name",
"name": "name"
}
for key, value in observation.items():
if key in key_mapping:
self.obs_metadata[key_mapping[key]] = value
self.obs_metadata["parset_user"] = parset_user
def add_field_of_view(self, index: int, anabeam: _ParsetProperty) -> None:
""" """
fov = {}
fov["idx"] = index
fov["pointings"] = []
fov["name"] = anabeam["target"]
fov["center"] = _get_pointing_center_dict(anabeam)
fov["time"] = _get_time_dict(anabeam)
fov["beamsquint"] = {
"correction": anabeam.get("beamSquint", False),
"frequency": {
"value": anabeam.get("optFrq", None),
"unit": "MHz"
}
}
fov["mini_arrays"] = _array_to_dict_array(anabeam["maList"])
fov["antennas"] = _array_to_dict_array(anabeam["antList"])
fov["filter"] = [{"name": int(fil), "start": tim.isot} for fil, tim in zip(anabeam["filter"], anabeam["filterTime"])]
self.fovs.append(fov)
def add_pointing(self, index: int, beam: _ParsetProperty, parset_version: tuple, pointing_setting_func: Callable = None) -> None:
""" """
pointing = {}
# Mandatory keys
pointing["idx"] = index
pointing["name"] = beam["target"]
pointing["center"] = _get_pointing_center_dict(beam)
pointing["time"] = _get_time_dict(beam)
# Select the correct function to store the receiver configuration
if pointing_setting_func is None:
# Automatically choose the function
pointing_setting_func = BEAM_SETTINGS[beam.get("toDo", "none").lower()]
pointing["receiver"] = pointing_setting_func(beam, self.output, parset_version)
# Assign the FoV index to each pointing
fov_idx = np.where(self.fov_indices == beam["noBeam"])[0][0]
self.pointings.append((fov_idx, pointing))
def add_nickel_pointing(self, anabeam: _ParsetProperty, index: int) -> None:
""" Only for parset < 1.0 """
nickel_pointing = {}
# Mandatory keys
nickel_pointing["idx"] = index
nickel_pointing["name"] = anabeam["target"]
nickel_pointing["center"] = _get_pointing_center_dict(anabeam)
nickel_pointing["time"] = _get_time_dict(anabeam)
nickel_pointing.update(
_nickel_setting(anabeam, self.output, version=(0, 0))
)
# Add the pointing to the list, with its associated fov index
self.pointings.append((0, nickel_pointing))
def add_xst_pointings(self) -> None:
""" """
# Get the last pointing index, before adding more
last_pointing_idx = len(self.pointings) - 1
for i, fov in enumerate(self.fovs):
start = Time(fov["time"]["startstop"]["gte"])
duration = TimeDelta(fov["time"]["duration"]["value"], format="sec")
zenith = SkyCoord(
0, 90,
unit="deg",
frame=AltAz(
obstime=start + duration/2,
location=nenufar_position
)
).transform_to(ICRS)
# Prepare the pointing configuration
xst_pointing = {
"idx": last_pointing_idx + 1 + i,
"center": {
"ra": {
"value": zenith.ra.deg,
"unit": "deg"
},
"dec": {
"value": zenith.dec.deg,
"unit": "deg"
},
"obs_direction_type": "zenith_xst"
},
"name": "",
"time": fov["time"],
"receiver": {
"name": "LaNewBa",
"frequency": _get_frequency_dict(self.output, field="xst_sbList")
}
}
# Add the pointing to the list, with its associated fov index
self.pointings.append((i, xst_pointing))
def remove_unused_miniarrays(self) -> None:
""" """
for fov in self.fovs:
# Check if remote MA are there, loop out if not
mas_in_fov = np.array([ma_in_fov["value"] for ma_in_fov in fov["mini_arrays"]])
if not np.any(mas_in_fov > 96):
continue
# Find out the receivers used
for pointing in fov["pointings"]:
if "nickel" == pointing["receiver"]["name"]:
# Check if one of the associated pointings implies NICKEL
continue
# Remove the remote Mini-Arrays
remote_mas_in_fov_mask = mas_in_fov > 96
fov["mini_arrays"] = np.array(fov["mini_arrays"])[~remote_mas_in_fov_mask].tolist()
log.info(
f"Remote Mini-Arrays have been removed for 'field_of_view' #{fov['idx']} because no associated 'pointing' is using the NICKEL receiver."
)
def save_file(self, file_name: str) -> None:
""" Writes the JSON file. """
with open(file_name, 'w', encoding='utf-8') as wf:
json.dump(self.data, wf, ensure_ascii=False, indent=4)
log.info(f"'{file_name}' written.")
# ============================================================= #
# ============================================================= #
# ============================================================= #
# --------------------------- Parset -------------------------- #
# ============================================================= #
[docs]
class Parset(object):
"""
"""
def __init__(self, parset):
self.observation = _ParsetProperty()
self.output = _ParsetProperty()
self.anabeams = {} # dict of _ParsetProperty
self.digibeams = {} # dict of _ParsetProperty
self.phase_centers = {}
self.parset_user = ""
self.parset = parset
# --------------------------------------------------------- #
# --------------------- Getter/Setter --------------------- #
@property
def parset(self):
"""
"""
return self._parset
@parset.setter
def parset(self, p):
if not isinstance(p, str):
raise TypeError(
'parset must be a string.'
)
if not p.endswith('.parset'):
raise ValueError(
'parset file must end with .parset'
)
p = abspath(p)
if not isfile(p):
raise FileNotFoundError(
f'Unable to find {p}'
)
self._parset = p
self._decodeParset()
@property
def version(self) -> tuple:
""" """
version_str = self.observation.get("parsetVersion", "0")
if version_str == "":
version_tuple = (1, 0) # default version
else:
version_tuple = tuple(map(lambda x: int(x), version_str.split(".")))
return version_tuple
# --------------------------------------------------------- #
# ------------------------ Methods ------------------------ #
[docs]
def to_json(self, path_name: str = None):
""" """
parset_version = self.version
json_entry = _JsonEntry(output=self.output)
json_entry.add_observation_metadata(
observation=self.observation,
parset_file=self.parset,
parset_user=self.parset_user
)
# Parse and store every field of view = analog configurations
for ana_idx, anabeam in self.anabeams.items():
json_entry.add_field_of_view(ana_idx, anabeam)
# Parse and store every pointing = digital beam configurations
digi_idx = 0
for digi_idx, digibeam in self.digibeams.items():
json_entry.add_pointing(digi_idx, digibeam, parset_version)
# Parse and store every imaging pointing = phase center configurations
if parset_version >= (1, 0):
# These were introduced with parset version 1.0
for center_idx, phase_center in self.phase_centers.items():
pc_index = center_idx + digi_idx + 1
json_entry.add_pointing(pc_index, phase_center, parset_version)
# Add extra pointings in some specific cases
# If XST are used
if self.output.get("xst_userfile", False):
# Add a pointing per anabeam if XST data have been taken
json_entry.add_xst_pointings()
# If NICKEL is used, old parset versions
if parset_version < (1, 0):
if "nickel" in self.output.get("nri_receivers", []):
if "TBD" not in [beam["toDo"] for beam in self.digibeams.values()]:
if len(self.anabeams) > 1:
log.warning("Found more than 1 FoV. A NICKEL pointing is added for the first one ONLY.")
# Add a NICKEL pointing corresponding to the analog beam
index = len(json_entry.pointings)
json_entry.add_nickel_pointing(anabeam=self.anabeams[0], index=index)
# Remove un-necessary Mini-Arrays indices
json_entry.remove_unused_miniarrays()
# Save or not the data to a file
if path_name is not None:
json_file_name = basename(self.parset).replace(".parset", ".json")
json_file = join(path_name, json_file_name)
json_entry.save_file(file_name=json_file)
else:
return json_entry.data
[docs]
def to_json_old(self, path_name=None):
""" """
data = {}
data["@timestamp"] = self.observation["startTime"].isot
# Fill out observation tab
data["file_name"] = {
"name": basename(self.parset),
"path": dirname(self.parset)
}
data["time"] = {
"startstop":
{
"gte": self.observation["startTime"].isot,
"lt": self.observation["stopTime"].isot
},
"duration": {
"value": (self.observation["stopTime"] - self.observation["startTime"]).sec,
"unit": "s"
}
}
topic = self.observation.get("topic", "ES00 DEBUG")
data["topic"] = {
"code": topic[:4] if topic.startswith('ES') else "ES00",
"name": topic[5:] if topic.startswith('ES') else topic
}
key_mapping = {
"title": "title",
"contactName": "contact_name",
"name": "name"
# "contactEmail": "contact_email",
# "topic": "topic"
}
for key, value in self.observation.items():
if key in key_mapping:
data[key_mapping[key]] = value
# to_dos = [digibeam["toDo"] for digibeam in self.digibeams.values()]
receivers_used = self.output["hd_receivers"] + self.output.get("nri_receivers", [])
# to_dos = np.unique(to_dos)
# data["receivers"] = {"name": receiver_name for receiver_name in to_dos if receiver_name.lower() != "tbd"}
data["receivers"] = [{"name": receiver_name} for receiver_name in receivers_used]
# Fill out outputs
# data["output"] = {}
# for key, value in self.output.items():
# data_level, data_property = key.split("_")
# if data_level not in data["output"]:
# data["output"][data_level] = {}
# data["output"][data_level][data_property] = value
# Fill out field of views (= anabeams)
data["field_of_views"] = []
for ana_idx, anabeam in self.anabeams.items():
fov = {}
fov["idx"] = ana_idx
fov["pointings"] = []
fov["name"] = anabeam["target"]
fov["center"] = self._get_pointing_center_dict(anabeam)
fov["time"] = self._get_time_dict(anabeam)
fov["beamsquint"] = {
"correction": anabeam.get("beamSquint", False),
"frequency": {
"value": anabeam.get("optFrq", None),
"unit": "MHz"
}
}
# fov["mini_arrays"] = anabeam["maList"]
fov["mini_arrays"] = self._array_to_dict_array(anabeam["maList"])
# fov["antennas"] = anabeam["antList"]
fov["antennas"] = self._array_to_dict_array(anabeam["antList"])
fov["filter"] = [{"name": int(fil), "start": tim.isot} for fil, tim in zip(anabeam["filter"], anabeam["filterTime"])]
data["field_of_views"].append(fov)
fov_indices = np.array([fov["idx"] for fov in data["field_of_views"]])
for digi_idx, digibeam in self.digibeams.items():
pointing = {}
pointing['idx'] = digi_idx
pointing["name"] = digibeam["target"]
pointing["center"] = self._get_pointing_center_dict(digibeam)
pointing["time"] = self._get_time_dict(digibeam)
if "toDo" not in digibeam:
pointing["receiver"] = {
"name": "LaNewBa",
"frequency": self._get_frequency_dict(digibeam, field="subbandList")
}
elif digibeam["toDo"].lower() == "pulsar":
try:
mode, config = self._parse_parameters(digibeam["parameters"], pulsar=True)
except KeyError:
log.warning(
f"Parset '{self.parset}' doesn't have any 'parameters' for numerical beam {digibeam['noBeam']})."
)
continue
if mode == "fold":
pointing["receiver"] = {
"name": "undysputed",
"mode": "pulsar_fold",
"source_name": config["src"],
"n_polars": 1 if config.get("onlyi", False) else 4,
"frequency": self._get_frequency_dict(digibeam, field="subbandList")
}
elif mode == "single":
pointing["receiver"] = {
"name": "undysputed",
"mode": "pulsar_single",
"source_name": config["src"],
"downsampling": int(config["dstime"]),
"n_polars": 1 if config.get("onlyi", False) else 4,
"frequency": self._get_frequency_dict(digibeam, field="subbandList")
}
elif mode == "waveolaf":
pointing["receiver"] = {
"name": "undysputed",
"mode": "pulsar_waveolaf",
"source_name": config["src"],
"frequency": self._get_frequency_dict(digibeam, field="subbandList")
}
elif mode == "wave":
pointing["receiver"] = {
"name": "undysputed",
"mode": "pulsar_wave",
"source_name": config["src"],
"frequency": self._get_frequency_dict(digibeam, field="subbandList")
}
else:
pointing["receiver_configuration"] = {}
elif digibeam["toDo"].lower() == "waveform":
pointing["receiver"] = {
"name": "undysputed",
"mode": "waveform",
#"source_name": config["src"],
"frequency": self._get_frequency_dict(digibeam, field="subbandList")
}
elif digibeam["toDo"].lower() == "dynamicspectrum":
try:
_, config = self._parse_parameters(digibeam["parameters"], pulsar=False)
except KeyError:
log.warning(
f"Parset '{self.parset}' doesn't have any 'parameters' for numerical beam {digibeam['noBeam']})."
)
continue
try:
pointing["receiver"] = {
"name": "undysputed",
"mode": "tf",
"dt": {
"value": float(config["dt"]),
"unit": "ms"
},
"df": {
"value": float(config["df"]),
"unit": "kHz"
},
"frequency": self._get_frequency_dict(digibeam, field="subbandList")
}
except KeyError:
log.warning(
f"Parset '{self.parset}' has a wrong '{digibeam['toDo']}' configuration."
)
continue
elif (digibeam["toDo"].lower() == "tbd") and ("nickel" in self.output.get("nri_receivers", [])):
# elif digibeam["toDo"].lower() == "imaging": # to be implemented?
pointing["receiver"] = {
"name": "nickel",
"channelization": {
"value": self.output["nri_channelization"],
"unit": None
},
"dumptime": {
"value": self.output["nri_dumpTime"],
"unit": "s"
},
"frequency": self._get_frequency_dict(self.output, "nri_subbandList")
}
# Select the correct fov
idx = np.where(fov_indices == digibeam["noBeam"])[0][0]
associated_fov = data["field_of_views"][idx]
associated_fov["pointings"].append(pointing)
# Add a pointing per anabeam if XST data have been taken
if self.output.get("xst_userfile", False):
for i, fov in enumerate(data["field_of_views"]):
start = Time(fov["time"]["startstop"]["gte"])
duration = TimeDelta(fov["time"]["duration"]["value"], format="sec")
zenith = SkyCoord(
0, 90,
unit="deg",
frame=AltAz(
obstime=start + duration/2,
location=nenufar_position
)
).transform_to(ICRS)
try:
last_dig_idx = digi_idx
except:
last_dig_idx = -1
fov["pointings"].append(
{
"idx": last_dig_idx + 1 + i,
"center": {
"ra": {
"value": zenith.ra.deg,
"unit": "deg"
},
"dec": {
"value": zenith.dec.deg,
"unit": "deg"
},
"obs_direction_type": "zenith_xst"
},
"name": "",
"time": fov["time"],
"receiver": {
"name": "LaNewBa",
"frequency": self._get_frequency_dict(self.output, field="xst_sbList")
}
}
)
# Add a pointing per anabeam if NiCKEL data have been taken in // with undysputed
# to_dos = [digibeam["toDo"] for digibeam in self.digibeams.values()]
# if ("nickel" in self.output.get("nri_receivers", [])) & ("TBD" not in to_dos):
# for i, fov in enumerate(data["field_of_views"]):
# Remove the remote Mini-Arrays if they are not used
for i, fov in enumerate(data["field_of_views"]):
# Check if remote MA are there
mas_in_fov = np.array([ma_in_fov["value"] for ma_in_fov in fov["mini_arrays"]])
if not np.any(mas_in_fov > 96):
continue
# Find out the receivers used
receivers_in_fov = []
for pointing in fov["pointings"]:
receivers_in_fov.append(pointing["receiver"]["name"])
# Check if one of the associated pointings implies NICKEL
if "nickel" in receivers_in_fov:
continue
# Remove the remote Mini-Arrays
remote_mas_in_fov_mask = mas_in_fov > 96
fov["mini_arrays"] = np.array(fov["mini_arrays"])[~remote_mas_in_fov_mask].tolist()
log.info(
f"Remote Mini-Arrays have been removed for 'field_of_view' #{fov['idx']} because no associated 'pointing' is using the NICKEL receiver."
)
data['parset_user'] = self.parset_user
if path_name is not None:
# Write the JSON file
json_file_name = basename(self.parset).replace(".parset", ".json")
json_file = join(path_name, json_file_name)
with open(json_file, 'w', encoding='utf-8') as wf:
json.dump(data, wf, ensure_ascii=False, indent=4)
log.info(f"'{json_file}' written.")
else:
return data
[docs]
def add_to_database(self, data_base):#dataBaseName):
"""
data_base: ParsetDataBase
"""
parsetDB = data_base
try:
parsetDB.parset = self.parset
except DuplicateParsetEntry:
return
try:
parsetDB.add_row(
{**self.observation, **self.output}, # dict merging
desc='observation'
)
except UserNameNotFound:
return
for anaIdx in self.anabeams.keys():
parsetDB.add_row(
self.anabeams[anaIdx],
desc='anabeam'
)
for digiIdx in self.digibeams.keys():
parsetDB.add_row(
self.digibeams[digiIdx],
desc='digibeam'
)
log.info(
f'Parset {self.parset} added to database {data_base.name}'
)
# --------------------------------------------------------- #
# ----------------------- Internal ------------------------ #
def _decodeParset(self):
"""
"""
with open(self.parset, 'r') as file_object:
line = file_object.readline()
while line:
try:
dicoName, content = line.split('.', 1)
except ValueError:
# This is a blank line
pass
key, value = content.split('=', 1)
if line.startswith('Observation'):
self.observation[key] = value
elif line.startswith('Output'):
self.output[key] = value
elif line.startswith('AnaBeam'):
anaIdx = int(re.search(r'\[(\d*)\]', dicoName).group(1))
if anaIdx not in self.anabeams.keys():
self.anabeams[anaIdx] = _ParsetProperty()
self.anabeams[anaIdx]['anaIdx'] = str(anaIdx)
self.anabeams[anaIdx][key] = value
elif line.startswith('Beam'):
digiIdx = int(re.search(r'\[(\d*)\]', dicoName).group(1))
if digiIdx not in self.digibeams.keys():
self.digibeams[digiIdx] = _ParsetProperty()
self.digibeams[digiIdx]['digiIdx'] = str(digiIdx)
self.digibeams[digiIdx][key] = value
elif line.startswith('PhaseCenter'):
pcIdx = int(re.search(r'\[(\d*)\]', dicoName).group(1))
if pcIdx not in self.phase_centers.keys():
self.phase_centers[pcIdx] = _ParsetProperty()
self.phase_centers[pcIdx]['pcIdx'] = str(pcIdx)
self.phase_centers[pcIdx][key] = value
line = file_object.readline()
log.info(
f"Parset '{self._parset}' loaded."
)
try:
with open(self.parset + '_user', 'r') as file_object:
line = file_object.readline()
while line:
self.parset_user = self.parset_user + line
line = file_object.readline()
except Exception as e:
pass
return
@staticmethod
def _parse_parameters(parameters, pulsar=False):
""" Parse values from the digital beam 'parameters'
entry.
E.g. 'TF: DF=3.05 DT=10.0 HAMM'
"""
parameters = parameters.lower()
mode = parameters.split(':')[0]
if pulsar:
configs = {
param.split('=')[0]: param.split('=')[1]\
for param in parameters.split('--')\
if '=' in param
}
configs.update({
param.rstrip(): True\
for param in parameters.split('--')\
if '=' not in param
})
else:
configs = {
param.split('=')[0]: param.split('=')[1]\
for param in parameters.split()\
if '=' in param
}
configs.update({
param.rstrip(): True\
for param in parameters.split('--')\
if '=' not in param
})
return mode, configs
@staticmethod
def _array_to_dict_array(array: list, unit: str = "") -> list:
""" """
if unit != "":
return [
{"value": val, "unit": unit} for val in array
]
else:
return [
{"value": val} for val in array
]
@staticmethod
def _get_time_dict(property) -> dict:
""" """
# Sort out the beam start and stop times
duration = TimeDelta(property['duration'] , format='sec')
start_time = property['startTime']
stop_time = (property['startTime'] + duration)
# return {
# "start": start_time.isot,
# "stop": stop_time.isot,
# "duration": {
# "value": duration.sec,
# "unit": "s"
# }
# }
return {
"startstop":
{
"gte": start_time.isot,
"lte": stop_time.isot
},
"duration": {
"value": np.round(duration.sec, 3),
"unit": "s"
}
}
@staticmethod
def _get_frequency_dict(property, field="subbandList") -> dict:
""" """
subband_list = property[field]
# Find consecutive subbands groups:
subband_list_groups = np.split(
subband_list,
np.where(np.diff(subband_list) != 1)[0] + 1
)
# return {
# "value": [
# {
# "gte": sb2freq(group.min())[0].to(u.MHz).value,
# "lt": (sb2freq(group.max()) + SB_WIDTH)[0].to(u.MHz).value,
# } for group in subband_list_groups
# ],
# "unit": "MHz"
# }
return [
{
"value": {
"gte": sb2freq(group.min())[0].to(u.MHz).value,
"lt": (sb2freq(group.max()) + SB_WIDTH)[0].to(u.MHz).value,
},
"unit": "MHz"
} for group in subband_list_groups
]
@staticmethod
def _get_miniarray_dict(mini_arrays: np.ndarray) -> dict:
""" """
# Find consecutive Mini-Arrays groups
ma_groups = np.split(
mini_arrays,
np.where(np.diff(mini_arrays) != 1)[0] + 1
)
return {
"value": [
{
"gte": group[0],
"lte": group[-1]
} for group in ma_groups
],
"unit": ""
}
@staticmethod
def _get_pointing_center_dict(property) -> dict:
""" Returns a RA, Dec whatever the pointing type is. """
def _constrain_angle(
angle: u.Quantity,
valmin: u.Quantity = 0.*u.deg,
valmax: u.Quantity = 90*u.deg
):
""" Constrain an angle between two values. """
if angle < valmin:
angle = valmin
elif angle > valmax:
angle = valmax
else:
pass
return angle
# Sort out the beam start and stop times
duration = TimeDelta(property['duration'] , format='sec')
start_time = property['startTime']
stop_time = (property['startTime'] + duration)
if "azelFile" in property:
# In case of pointing described by an azelfile
# it will be treated as a zenith pointing (wrong but best compromise for the database)
property["directionType"] = "azelgeo_azelfile"
# Deal with coordinates and pointing types
direction_type = property['directionType'].lower()
if direction_type == "j2000":
ra = property['angle1'].to(u.deg)
dec = property['angle2'].to(u.deg)
if ("decal_az" in property) or ("decal_el" in property):
altaz = SkyCoord(ra, dec).transform_to(
AltAz(
obstime=start_time + duration/2.,
location=nenufar_position
)
)
radec = SkyCoord(
_constrain_angle(
altaz.az + float(property.get("decal_az", 0.0))*u.deg,
valmin=0.*u.deg,
valmax=360.*u.deg
),
_constrain_angle(
altaz.alt + float(property.get("decal_el", 0.0))*u.deg,
valmin=0.*u.deg,
valmax=90.*u.deg
),
frame=AltAz(
obstime=start_time + duration/2.,
location=nenufar_position
)
).transform_to(ICRS)
ra = radec.ra
dec = radec.dec
# Nothing else to do
decal_ra = float(property.get("decal_ra", 0.0))*u.deg
decal_dec = float(property.get("decal_dec", 0.0))*u.deg
right_ascension = _constrain_angle(
(ra + decal_ra).value,
valmin=0.,
valmax=360.
)
declination = _constrain_angle(
(dec + decal_dec).value,
valmin=-90.,
valmax=90.
)
elif direction_type == "azelgeo":
# This is a transit observation, compute the mean RA/Dec
# Convert AltAz to RA/Dec
radec = SkyCoord(
_constrain_angle(
property['angle1'] + float(property.get("decal_az", 0.0))*u.deg,
valmin=0.*u.deg,
valmax=360.*u.deg
),
_constrain_angle(
property['angle2'] + float(property.get("decal_el", 0.0))*u.deg,
valmin=0.*u.deg,
valmax=90.*u.deg
),
frame=AltAz(
obstime=start_time + duration/2.,
location=nenufar_position
)
).transform_to(ICRS)
right_ascension = _constrain_angle(
radec.ra.deg + float(property.get("decal_ra", 0.0)),
valmin=0.,
valmax=360.
)
declination = _constrain_angle(
radec.dec.deg + float(property.get("decal_dec", 0.0)),
valmin=-90.,
valmax=90.
)
elif direction_type == "azelgeo_azelfile":
# This observation was made using an azelfile
radec = SkyCoord(
0.*u.deg,
90*u.deg,
frame=AltAz(
obstime=start_time + duration/2.,
location=nenufar_position
)
).transform_to(ICRS)
right_ascension = radec.ra.deg
declination = radec.dec.deg
elif direction_type == "natif":
# This is a test observation, unable to parse the RA/Dec
right_ascension = None
declination = None
else:
# Dealing with a Solar System source
solar_system_target = SolarSystemTarget.from_name(
name=direction_type,
time=start_time + duration/2.
)
radec = solar_system_target.coordinates
if ("decal_az" in property) or ("decal_el" in property):
altaz = solar_system_target.horizontal_coordinates[0]
radec = SkyCoord(
_constrain_angle(
altaz.az + float(property.get("decal_az", 0.0))*u.deg,
valmin=0.*u.deg,
valmax=360.*u.deg
),
_constrain_angle(
altaz.alt + float(property.get("decal_el", 0.0))*u.deg,
valmin=0.*u.deg,
valmax=90.*u.deg
),
frame=AltAz(
obstime=start_time + duration/2.,
location=nenufar_position
)
).transform_to(ICRS)
decal_ra = float(property.get("decal_ra", 0.0))*u.deg
decal_dec = float(property.get("decal_dec", 0.0))*u.deg
right_ascension = _constrain_angle(
radec.ra.deg + decal_ra.value,
valmin=0.,
valmax=360.
)
declination = _constrain_angle(
radec.dec.deg + decal_dec.value,
valmin=-90.,
valmax=90.
)
return {
"ra": {
"value": right_ascension,
"unit": "deg"
},
"dec": {
"value": declination,
"unit": "deg"
},
"obs_direction_type": property["directionType"].lower()
}
# ============================================================= #
# ============================================================= #
# ------------------------ ParsetUser ------------------------- #
# ============================================================= #
class _ParsetBlock:
"""
"""
def __init__(self, field):
self.field = field
self.configuration = deepcopy(PARSET_OPTIONS[self.field])
def __setitem__(self, key, value):
"""
"""
self._modify_properties(**{key: value})
def __getitem__(self, key):
"""
"""
return self.configuration[key]["value"]
@property
def fields(self):
""" Lists the available fields. """
return list(self.configuration.keys())
def _modify_properties(self, **kwargs):
"""
"""
for key, value in kwargs.items():
# If the key exists, it will be udpated
if key in self.configuration:
# If the value is an astropy.Time instance, the format is 'YYYY-MM-DDThh:mm:ssZ'
if isinstance(value, Time):
value.precision = 0
value = value.isot + "Z"
# Durations/exposures are expressed in seconds
elif isinstance(value, TimeDelta):
value = str(int(np.round(value.sec))) + "s"
# The boolean values needs to be translated to strings
elif isinstance(value, bool):
value = "true" if value else "false"
# Updates the key value
self.configuration[key]["value"] = value
self.configuration[key]["modified"] = True
# If the key doesn't exist a warning message is raised
else:
log.warning(
f"Key '{key}' is invalid. Available keys are: {self.configuration.keys()}."
)
def _write_block_list(self, index=None) -> str:
"""
"""
# Prints a counter that is shown regarding the beam indices
if index is not None:
counter = f"[{index}]"
else:
counter = ""
# Writes the parset blocks in the correct format
return "\n".join(
[f"{self.field}{counter}.{key}={val['value']}"
for key, val in self.configuration.items()
if (val['modified'] or val['required'])
])
# ============================================================= #
class _BeamParsetBlock(_ParsetBlock):
"""
"""
def __init__(self, field, **kwargs):
super().__init__(field=field)
self.index = 0
self._modify_properties(**kwargs)
def __str__(self):
return self._write_block_list(index=self.index)
def is_above_horizon(self) -> bool:
""" Checks that the numerical beam is pointed above the horizon. """
# beam_start_time = Time(self["startTime"], format="isot")
# beam_duration = self._get_duration()
return True
def _get_duration(self) -> TimeDelta:
""" Reads the 'duration' field and converts it to a TimeDelta instance. """
# Regex check to split the value and the unit
match = re.match(
pattern=r"(?P<value>\d+)(?P<unit>[smh])",
string=self["duration"]
)
value = float(match.group("value"))
# Prepares a dictionnary to convert unit to seconds
to_seconds = {
"s": 1,
"m": 60,
"h": 3600
}
conversion_factor = to_seconds[match.group("unit").lower()]
# Converts the value to seconds
seconds = value * conversion_factor
return TimeDelta(seconds, format="sec")
# ============================================================= #
class _NumericalBeamParsetBlock(_BeamParsetBlock):
"""
"""
def __init__(self, **kwargs):
super().__init__(field="Beam", **kwargs)
def __repr__(self) -> str:
return f"<NumericalBeam(target={self['target']}, index={self.index})>"
# ============================================================= #
class _PhaseCenterParsetBlock(_BeamParsetBlock):
"""
"""
def __init__(self, **kwargs):
super().__init__(field="PhaseCenter", **kwargs)
def __repr__(self) -> str:
return f"<PhaseCenter(target={self['target']}, index={self.index})>"
# ============================================================= #
class _AnalogBeamParsetBlock(_BeamParsetBlock):
"""
"""
def __init__(self, **kwargs):
super().__init__(field="Anabeam", **kwargs)
self.numerical_beams = []
self.phase_centers = []
def __repr__(self) -> str:
return f"<AnalogBeam(target={self['target']}, index={self.index})>"
def _add_numerical_beam(self, **kwargs) -> None:
"""
"""
self.numerical_beams.append(
_NumericalBeamParsetBlock(
**kwargs
)
)
def _add_phase_center(self, **kwargs) -> None:
""" """
self.phase_centers.append(
_PhaseCenterParsetBlock(
**kwargs
)
)
def _propagate_index(self) -> None:
"""
"""
for i, numbeam in enumerate(self.numerical_beams):
numbeam["noBeam"] = self.index
# ============================================================= #
class _OutputParsetBlock(_ParsetBlock):
"""
"""
def __init__(self, **kwargs):
super().__init__(field="Output")
self._modify_properties(**kwargs)
def __str__(self):
return self._write_block_list()
# ============================================================= #
class _ObservationParsetBlock(_ParsetBlock):
"""
"""
def __init__(self, **kwargs):
super().__init__(field="Observation")
self._modify_properties(**kwargs)
def __str__(self):
return self._write_block_list()
# ============================================================= #
[docs]
class ParsetUser:
""" Class that handles the formatting of a NenuFAR *parset_user* file.
:Example:
.. code-block:: python
from nenupy.observation import ParsetUser
# Create an instance of this class
p = ParsetUser()
# Add an analog beam
p.add_analog_beam(target="Analog beam 1")
# Add two numerical beams associated with this analog beam
p.add_numerical_beam(0, target="One")
p.add_numerical_beam(0, target="Two")
# Print the analog beam parset block
print(p.analog_beams[0])
# Validate the parset
p.validate()
# Write the file
p.write("my_parset.parset_user")
.. seealso::
NenuFAR `Parset_User guide <https://doc-nenufar.obs-nancay.fr/UsersGuide/parsetFileuserparset_user.html>`_ and the :ref:`tutorial <parset_user_doc>`.
.. rubric:: Attributes Summary
.. autosummary::
~ParsetUser.analog_beam_fields
~ParsetUser.numerical_beam_fields
~ParsetUser.phase_center_fields
~ParsetUser.observation_fields
~ParsetUser.output_fields
.. rubric:: Methods Summary
.. autosummary::
~ParsetUser.set_observation_config
~ParsetUser.set_output_config
~ParsetUser.add_analog_beam
~ParsetUser.add_numerical_beam
~ParsetUser.modify_analog_beam
~ParsetUser.modify_numerical_beam
~ParsetUser.remove_analog_beam
~ParsetUser.remove_numerical_beam
~ParsetUser.validate
~ParsetUser.write
.. rubric:: Attributes and Methods Documentation
"""
def __init__(self):
self.observation = _ObservationParsetBlock()
self.output = _OutputParsetBlock()
self.analog_beams = []
def __str__(self):
self._update_beam_numbers()
# Prepares the different text blocks
observation_text = str(self.observation)
output_text = str(self.output)
return "\n\n".join(
[observation_text,
self._analog_beams_str,
self._numerical_beams_str,
self._phase_center_str,
output_text]
)
@property
def analog_beam_fields(self) -> list:
""" Lists all the available *analog beam* fields. """
return _AnalogBeamParsetBlock().fields
@property
def numerical_beam_fields(self) -> list:
""" Lists all the available *numerical beam* fields. """
return _NumericalBeamParsetBlock().fields
@property
def phase_center_fields(self) -> list:
""" Lists all the available *phase center* fields. """
return _PhaseCenterParsetBlock().fields
@property
def observation_fields(self) -> list:
""" Lists all the available *observation* fields. """
return _ObservationParsetBlock().fields
@property
def output_fields(self) -> list:
""" Lists all the available *output* fields. """
return _OutputParsetBlock().fields
@property
def _analog_beams_str(self) -> str:
"""
"""
return "\n\n".join(
str(anabeam)
for anabeam in self.analog_beams
)
@property
def _numerical_beams_str(self) -> str:
"""
"""
return "\n\n".join(
str(numbeam)
for anabeam in self.analog_beams
for numbeam in anabeam.numerical_beams
)
@property
def _phase_center_str(self) -> str:
"""
"""
return "\n\n".join(
str(pc)
for anabeam in self.analog_beams
for pc in anabeam.phase_centers
)
[docs]
def set_observation_config(self, **kwargs) -> None:
""" Sets the configuration of the *parset_user* observation block.
This method ingests any valid `keyword argument <https://doc-nenufar.obs-nancay.fr/UsersGuide/parsetFileuserparset_user.html>`_ corresponding to an *Observation* configuration.
:Example:
.. code-block:: python
:emphasize-lines: 4
from nenupy.observation import ParsetUser
p = ParsetUser()
p.set_observation_config(
name="My observation",
contactName="AlanLoh",
contactEmail="alan.loh@obspm.fr",
topic="DEBUG"
)
"""
for key, value in kwargs.items():
self.observation[key] = value
[docs]
def set_output_config(self, **kwargs) -> None:
""" Sets the configuration of the *parset_user* output block.
This method ingests any valid `keyword argument <https://doc-nenufar.obs-nancay.fr/UsersGuide/parsetFileuserparset_user.html>`_ corresponding to an *Output* configuration.
:Example:
.. code-block:: python
:emphasize-lines: 4
from nenupy.observation import ParsetUser
p = ParsetUser()
p.set_output_config(
hd_bitMode=16,
hd_receivers="[undysputed]"
)
"""
for key, value in kwargs.items():
self.output[key] = value
[docs]
def add_analog_beam(self, **kwargs) -> None:
""" Adds an *analog beam* to the :attr:`~nenupy.observation.parset.ParsetUser.analog_beams` attribute and updates its index based on other *analog beams*.
This method ingests any valid `keyword argument <https://doc-nenufar.obs-nancay.fr/UsersGuide/parsetFileuserparset_user.html>`_ corresponding to an *Anabeam* configuration.
:Example:
.. code-block:: python
:emphasize-lines: 5
from nenupy.observation import ParsetUser
from astropy.time import Time, TimeDelta
p = ParsetUser()
p.add_analog_beam(
target="My fav target",
simbadSearch="Cygnus X-3",
trackingType="tracking",
duration=TimeDelta(3600, format="sec"), # or "3600s"
startTime=Time("2022-01-01 12:00:00") # or "2022-01-01T12:00:00Z"
)
"""
self.analog_beams.append(
_AnalogBeamParsetBlock(**kwargs)
)
self._updates_anabeams_indices()
[docs]
def modify_analog_beam(self, anabeam_index: int, **kwargs) -> None:
""" Modifies the configuration of the *analog beam*.
:param anabeam_index:
Index of the *analog beam* to modify.
:type anabeam_index:
`int`
:Example:
.. code-block:: python
:emphasize-lines: 5
from nenupy.observation import ParsetUser
p = ParsetUser()
p.add_analog_beam(target="First")
p.modify_analog_beam(0, target="Modified_Value")
"""
self.analog_beams[anabeam_index]._modify_properties(**kwargs)
[docs]
def remove_analog_beam(self, anabeam_index: int) -> None:
""" Removes an *analog beam* along with its associated *numerical beams* and *phase centers*.
:param anabeam_index:
Index of the *analog beam* to remove.
:type anabeam_index:
`int`
.. note::
One can quickly identify the indices of the *analog beams*:
.. code-block:: python
:emphasize-lines: 6
from nenupy.observation import ParsetUser
p = ParsetUser()
p.add_analog_beam(target="First")
p.add_analog_beam(target="Second")
p.analog_beams # prints: [<AnalogBeam(target=First, index=0)>, <AnalogBeam(target=Second, index=1)>]
:Example:
.. code-block:: python
:emphasize-lines: 6
from nenupy.observation import ParsetUser
p = ParsetUser()
p.add_analog_beam(target="First")
p.add_analog_beam(target="Second")
p.remove_analog_beam(anabeam_index=0) # removes the analog beam "First"
"""
del self.analog_beams[anabeam_index]
self._updates_anabeams_indices()
[docs]
def add_numerical_beam(self, anabeam_index: int = 0, **kwargs) -> None:
""" Adds a *numerical beam* to the *analog beam* '``anabeam_index``' and updates its index based on other *numerical beams*.
This method ingests any valid `keyword argument <https://doc-nenufar.obs-nancay.fr/UsersGuide/parsetFileuserparset_user.html>`_ corresponding to a *Beam* configuration.
:Example:
.. code-block:: python
:emphasize-lines: 11
from nenupy.observation import ParsetUser
p = ParsetUser()
p.add_analog_beam(
target="My fav target",
simbadSearch="Cygnus X-3",
trackingType="tracking",
duration="3600s",
startTime="2022-01-01T12:00:00Z"
)
p.add_numerical_beam(
anabeam_index=0,
target="My fav target",
useParentPointing=True,
subbandList="[200..300]"
)
"""
if anabeam_index >= len(self.analog_beams):
raise IndexError(
f"Requested analog beam index {anabeam_index} is out of range. Only {len(self.analog_beams)} analog beams are set."
)
anabeam = self.analog_beams[anabeam_index]
anabeam._add_numerical_beam(**kwargs)
anabeam._propagate_index()
self._updates_numbeams_indices()
[docs]
def modify_numerical_beam(self, numbeam_index: int, **kwargs) -> None:
""" Modifies the configuration of the *numerical beam*.
:param numbeam_index:
Index of the *numerical beam* to modify.
:type numbeam_index:
`int`
:Example:
.. code-block:: python
:emphasize-lines: 5
from nenupy.observation import ParsetUser
p = ParsetUser()
p.add_analog_beam(target="First")
p.add_numerical_beam(0, target="Initial_Value")
p.modify_numerical_beam(0, target="Modified_Value")
"""
counter = 0
for anabeam in self.analog_beams:
for i, _ in enumerate(anabeam.numerical_beams):
if counter==numbeam_index:
anabeam.numerical_beams[i]._modify_properties(**kwargs)
break
counter += 1
else:
continue
break
[docs]
def remove_numerical_beam(self, numbeam_index: int) -> None:
""" Removes a *numerical beam* and updates the *numerical beam* indices.
:param numbeam_index:
Index of the *numerical beam* to remove.
:type numbeam_index:
`int`
.. note::
One can quickly identify the indices of the numerical beams:
.. code-block:: python
:emphasize-lines: 9
from nenupy.observation import ParsetUser
p = ParsetUser()
p.add_analog_beam(target="Analog beam 1")
p.add_analog_beam(target="Analog beam 2")
p.add_numerical_beam(0, target="One")
p.add_numerical_beam(0, target="Two")
p.add_numerical_beam(1, target="Three")
[anabeam.numerical_beams for anabeam in p.analog_beams] # prints: [[<NumericalBeam(target=One, index=0)>, <NumericalBeam(target=Two, index=1)>], [<NumericalBeam(target=Three, index=2)>]]
:Example:
.. code-block:: python
:emphasize-lines: 6
from nenupy.observation import ParsetUser
p = ParsetUser()
p.add_analog_beam(target="Analog beam 1")
p.add_numerical_beam(0, target="One")
p.add_numerical_beam(0, target="Two")
p.remove_numerical_beam(numbeam_index=0) # removes the numerical beam "One"
"""
counter = 0
for anabeam in self.analog_beams:
for i, _ in enumerate(anabeam.numerical_beams):
if counter==numbeam_index:
del anabeam.numerical_beams[i]
break
counter += 1
else:
continue
break
self._updates_numbeams_indices()
[docs]
def add_phase_center(self, anabeam_index: int = 0, **kwargs) -> None:
""" Adds a *phase center* to the *analog beam* '``anabeam_index``' and updates its index based on other *phase centers*.
This method ingests any valid `keyword argument <https://doc-nenufar.obs-nancay.fr/UsersGuide/parsetFileuserparset_user.html>`_ corresponding to a *PhaseCenter* configuration.
:Example:
.. code-block:: python
:emphasize-lines: 11
from nenupy.observation import ParsetUser
p = ParsetUser()
p.add_analog_beam(
target="My fav target",
simbadSearch="Cygnus X-3",
trackingType="tracking",
duration="3600s",
startTime="2022-01-01T12:00:00Z"
)
p.add_phase_center(
anabeam_index=0,
target="My fav target",
useParentPointing=True,
subbandList="[200..300]"
)
"""
if anabeam_index >= len(self.analog_beams):
raise IndexError(
f"Requested analog beam index {anabeam_index} is out of range. Only {len(self.analog_beams)} analog beams are set."
)
anabeam = self.analog_beams[anabeam_index]
anabeam._add_phase_center(**kwargs)
anabeam._propagate_index()
self._updates_phasecenter_indices()
[docs]
def modify_phase_center(self, phasecenter_index: int, **kwargs) -> None:
""" Modifies the configuration of the *phase center*.
:param phasecenter_index:
Index of the *phase center* to modify.
:type phasecenter_index:
`int`
:Example:
.. code-block:: python
:emphasize-lines: 5
from nenupy.observation import ParsetUser
p = ParsetUser()
p.add_analog_beam(target="First")
p.add_phase_center(0, target="Initial_Value")
p.modify_phase_center(0, target="Modified_Value")
"""
counter = 0
for anabeam in self.analog_beams:
for i, _ in enumerate(anabeam.phase_centers):
if counter==phasecenter_index:
anabeam.phase_centers[i]._modify_properties(**kwargs)
break
counter += 1
else:
continue
break
[docs]
def remove_phase_center(self, phasecenter_index: int) -> None:
""" Removes a *phase center* and updates the *phase center* indices.
:param phasecenter_index:
Index of the *phase center* to remove.
:type phasecenter_index:
`int`
.. note::
One can quickly identify the indices of the phase centers:
.. code-block:: python
:emphasize-lines: 9
from nenupy.observation import ParsetUser
p = ParsetUser()
p.add_analog_beam(target="Analog beam 1")
p.add_analog_beam(target="Analog beam 2")
p.add_phase_center(0, target="One")
p.add_phase_center(0, target="Two")
p.add_phase_center(1, target="Three")
[anabeam.phase_centers for anabeam in p.analog_beams] # prints: [[<PhaseCenter(target=One, index=0)>, <PhaseCenter(target=Two, index=1)>], [<PhaseCenter(target=Three, index=2)>]]
:Example:
.. code-block:: python
:emphasize-lines: 6
from nenupy.observation import ParsetUser
p = ParsetUser()
p.add_analog_beam(target="Analog beam 1")
p.add_phase_center(0, target="One")
p.add_phase_center(0, target="Two")
p.remove_phase_center(phasecenter_index=0) # removes the phase center "One"
"""
counter = 0
for anabeam in self.analog_beams:
for i, _ in enumerate(anabeam.phase_centers):
if counter==phasecenter_index:
del anabeam.phase_centers[i]
break
counter += 1
else:
continue
break
self._updates_phasecenter_indices()
[docs]
def validate(self) -> bool:
""" Validates the syntax of each field.
:returns:
``True`` if all the checked keys have a correct syntax.
:rtype:
`bool`
.. warning::
This is merely just a syntax validation made with regular expressions.
The relevance of the parameter values are not checked at all.
"""
is_valid = True
# Update the beam numbers on the Observation table
self._update_beam_numbers()
# Check that the beams are above the horizon during the course of the observation
for anabeam in self.analog_beams:
if not anabeam.is_above_horizon():
log.warning("")
for numbeam in anabeam.numerical_beams:
if not numbeam.is_above_horizon():
log.warning("")
for pc in anabeam.phase_centers:
if not pc.is_above_horizon():
log.warning("")
def check_keys(dictionnary: dict) -> bool:
all_keys_ok = True
# Check each key and the corresponding regex syntax
for key in dictionnary:
# Get the regex syntax and if it doesn't exist, go to the next key
try:
syntax_pattern = dictionnary[key]['syntax']
except KeyError:
continue
# Don't check the key if it has not been modified
if not dictionnary[key]["modified"]:
continue
# Retrieve the value that needs to be checked
value = dictionnary[key]["value"]
if str(value) == '':
log.warning(f"Empty value for key '{key}'.")
# Perform a regex full match check, send a warning if invalid
if re.fullmatch(pattern=syntax_pattern, string=str(value)) is None:
log.error(
f"Syntax error on '{value}' (key '{key}'). Please check https://doc-nenufar.obs-nancay.fr/UsersGuide/parsetFileuserparset_user.html"
)
all_keys_ok &= False
return all_keys_ok
# Check all configurations
log.info("Checking 'observation' block...")
is_valid &= check_keys(self.observation.configuration)
log.info("Checking 'output' block...")
is_valid &= check_keys(self.output.configuration)
for anabeam in self.analog_beams:
log.info(f"Checking 'anabeam[{anabeam.index}]' block...")
is_valid &= check_keys(anabeam.configuration)
for numbeam in anabeam.numerical_beams:
log.info(f"Checking 'beam[{numbeam.index}]' block...")
is_valid &= check_keys(numbeam.configuration)
for pc in anabeam.phase_centers:
log.info(f"Checking 'phasecenter[{pc.index}]' block...")
is_valid &= check_keys(pc.configuration)
return is_valid
[docs]
def write(self, file_name: str) -> None:
""" Writes the current instance of :class:`~nenupy.observation.parset.ParsetUser`
to a file called ``file_name``.
:param file_name:
Name of the *parset_user* file to write.
The file extension must be ``".parset_user"``.
:type file_name:
`str`
"""
if not file_name.endswith(".parset_user"):
raise ValueError(f"file_name='{file_name}' does not end with '.parset_user'.")
with open(file_name, "w") as wfile:
wfile.write(str(self))
log.debug(f"Parset written in file {file_name}.")
def _updates_numbeams_indices(self) -> None:
""" Updates the indices of numerical beams. """
numbeams_counter = 0
for anabeam in self.analog_beams:
for numbeam in anabeam.numerical_beams:
numbeam.index = numbeams_counter
numbeams_counter += 1
def _updates_phasecenter_indices(self) -> None:
""" Updates the indices of phase centers. """
pc_counter = 0
for anabeam in self.analog_beams:
for pc in anabeam.phase_centers:
pc.index = pc_counter
pc_counter += 1
def _updates_anabeams_indices(self) -> None:
""" Updates the indices of analog beams. """
anabeams_counter = 0
for anabeam in self.analog_beams:
anabeam.index = anabeams_counter
anabeam._propagate_index()
anabeams_counter += 1
self._updates_numbeams_indices()
def _update_beam_numbers(self) -> None:
""" Updates the number of analog, numerical beams and phase centers. """
nb_analog_beams = len(self.analog_beams)
nb_numerical_beams = sum(len(anabeam.numerical_beams) for anabeam in self.analog_beams)
nb_phase_centers = sum(len(anabeam.phase_centers) for anabeam in self.analog_beams)
self.observation["nrAnabeams"] = str(nb_analog_beams)
self.observation["nrBeams"] = str(nb_numerical_beams)
self.observation["nrPhaseCenters"] = str(nb_phase_centers)
# ============================================================= #
# ============================================================= #