Source code for nenupy.observation.obs_config

#! /usr/bin/python3
# -*- coding: utf-8 -*-


"""
    *********************************
    Observation configuration classes
    *********************************
"""


__author__ = 'Alan Loh, Baptiste Cecconi'
__copyright__ = 'Copyright 2020, nenupy'
__credits__ = ['Alan Loh']
__maintainer__ = 'Alan'
__email__ = 'alan.loh@obspm.fr'
__status__ = 'Production'
__all__ = [
    'BSTConfig',
    'NICKELConfig',
    'TFConfig',
    'RAWConfig',
    'PulsarFoldConfig',
    'PulsarWaveConfig',
    'PulsarSingleConfig',
    'ObsConfig'
]


import astropy.units as u
from astropy.time import Time, TimeDelta
import numpy as np

from nenupy.observation import Parset
from nenupy.miscellaneous import accepts

import logging
log = logging.getLogger(__name__)


backendProperties = {
    'nickel': {
        'nSubBands': {
            'min': 1,
            'max': 384,
            'default': 384,
            'type': '`int`',
            'desc': 'Number of sub-bands'
        },
        'nChannels': {
            'min': 1,
            'max': 64,
            'default': 64,
            'type': '`int`',
            'desc': 'Number of channels'
        },
        'nPolars': {
            'min': 1,
            'max': 4,
            'default': 4,
            'type': '`int`',
            'desc': 'Number of polarizations'
        },
        'nMAs': {
            'min': 2,
            'max': 96 + 6,
            'default': 96,
            'type': '`int`',
            'desc': 'Number of Mini-Arrays'
        },
        'timeRes': {
            'min': 0.,
            'max': 10.,
            'default': 1.,
            'type': '`float` or :class:`~astropy.time.TimeDelta`',
            'desc': 'Time resolution in seconds'
        }
    },
    'raw': {
        'nPolars': {
            'min': 1,
            'max': 4,
            'default': 4,
            'type': '`int`',
            'desc': 'Number of polarizations'
        },
        'nSubBands': {
            'min': 1,
            'max': 192,
            'default': 192,
            'type': '`int`',
            'desc': 'Number of sub-bands'
        },
        'nBits': {
            'min': 8,
            'max': 16,
            'default': 8,
            'type': '`int`',
            'desc': 'Number of bits on which are recorded data elements'
        }
    },
    'tf': {
        'nPolars': {
            'min': 1,
            'max': 4,
            'default': 4,
            'type': '`int`',
            'desc': 'Number of polarizations'
        },
        'timeRes': {
            'min': (0.32*u.ms).to(u.s).value,
            'max': (83.89*u.ms).to(u.s).value,
            'default': (5.00*u.ms).to(u.s).value,
            'type': '`float` or :class:`~astropy.time.TimeDelta`',
            'desc': 'Time resolution in seconds'
        },
        'freqRes': {
            'min': (0.095*u.kHz).to(u.Hz).value,
            'max': (12.21*u.kHz).to(u.Hz).value,
            'default': (6.10*u.kHz).to(u.Hz).value,
            'type': '`float` or :class:`~astropy.units.Quantity`',
            'desc': 'Frequency resolution in Hz'
        },
        'nSubBands': {
            'min': 1,
            'max': 768,
            'default': 768,
            'type': '`int`',
            'desc': 'Number of sub-bands'
        },
    },
    'bst': {
        'nSubBands': {
            'min': 1,
            'max': 768,
            'default': 768,
            'type': '`int`',
            'desc': 'Number of sub-bands'
        },
        'nPolars': {
            'min': 1,
            'max': 2,
            'default': 2,
            'type': '`int`',
            'desc': 'Number of polarizations'
        }
    },
    # 'sst': {},
    'pulsar_fold': {
        'nSubBands': {
            'min': 1,
            'max': 192,
            'default': 192,
            'type': '`int`',
            'desc': 'Number of sub-bands'
        },
        'nPolars': {
            'min': 1,
            'max': 4,
            'default': 4,
            'type': '`int`',
            'desc': 'Number of polarizations'
        },
        'tFold': {
            'min': 5.36870912,
            'max': 21.47483648,
            'default': 10.73741824,
            'type': '`float` or :class:`~astropy.time.TimeDelta`',
            'desc': 'Pulsar time fold in seconds'
        },
        'nBins': {
            'min': 16,
            'max': 8096,
            'default': 2048,
            'type': '`int`',
            'desc': 'Number of bins'
        }
    },
    'pulsar_waveolaf': {
        'nSubBands': {
            'min': 1,
            'max': 192,
            'default': 192,
            'type': '`int`',
            'desc': 'Number of sub-bands'
        }
    },
    'pulsar_single': {
        'nSubBands': {
            'min': 1,
            'max': 192,
            'default': 192,
            'type': '`int`',
            'desc': 'Number of sub-bands'
        },
        'nPolars': {
            'min': 1,
            'max': 4,
            'default': 4,
            'type': '`int`',
            'desc': 'Number of polarizations'
        },
        'dsTime': {
            'min': 1,
            'max': 4096,
            'default': 128,
            'type': '`int`',
            'desc': 'Downsampling'
        },
        'nBits': {
            'min': 8,
            'max': 64,
            'default': 32,
            'type': '`int`',
            'desc': 'Number of bits on which are recorded data elements'
        }
    }
}


complex64 = np.complex64().itemsize * u.byte
float32 = np.float32().itemsize * u.byte


def doc(docstring, backend):
    prop = backendProperties[backend]
    paramDoc = ''
    for key in prop.keys():
        param = prop[key]
        paramDoc += f"""
            :param {key}:
                {param['desc']} (min: ``{param['min']}``,
                max: ``{param['max']}``, default: ``{param['default']}``).
            :type {key}: {param['type']}
        """
    paramDoc += """
            :param durationSec:
                Observation duration in seconds (default: ``0``).
            :type durationSec: `int` or :class:`~astropy.time.TimeDelta`

            .. versionadded:: 1.2.0
    """
    def document(func):
        func.__doc__ = docstring + '\n' + paramDoc
        return func
    return document


# ============================================================= #
# ---------------------- _BackendConfig ----------------------- #
# ============================================================= #
class _BackendConfig(object):
    """
        .. versionadded:: 1.2.0
    """

    def __init__(self, backend, **kwargs):
        self.startTime = kwargs.get(
            'startTime',
            Time.now()
        )
        self.durationSec = kwargs.get(
            'durationSec',
            0
        )
        self._backend = backend

        # Catch the irrelevant kwargs
        for attr in kwargs.keys():
            if attr.startswith('_'):
                raise AttributeError(
                    "Attribute '{}' starting with '_' cannot be set.".format(
                        attr
                    )
                )
            if attr not in dir(self):
                raise AttributeError(
                    "'{}' object has no attribute '{}'".format(
                        self.__class__.__name__,
                        attr
                    )
                )

        # Fill the relevant kwargs
        for attr in backendProperties[self._backend].keys():
            setattr(
                self,
                attr,
                kwargs.get(
                    attr,
                    backendProperties[self._backend][attr]['default']
                )
            )


    def __str__(self):
        className = self.__class__.__name__
        title = "Backend configuration of type '{}'\n".format(className)
        attributes = backendProperties[self._backend].keys()
        properties = '\tProperties: '
        for att in attributes:
            properties += "'{}={}', ".format(att, getattr(self, att))
        properties += "'{}={}', ".format('durationSec', getattr(self, 'durationSec'))
        properties = properties[:-2] # remove the last coma
        return title + properties


    # --------------------------------------------------------- #
    # --------------------- Getter/Setter --------------------- #
    @property
    def nSubBands(self):
        """
        """
        return self._nSubBands
    @nSubBands.setter
    @accepts(object, int)
    def nSubBands(self, nsb):
        self._nSubBands = self._checkAttr(
            key='nSubBands',
            value=nsb,
            name='sub-bands'
        )


    @property
    def nChannels(self):
        """
        """
        return self._nChannels
    @nChannels.setter
    @accepts(object, int)
    def nChannels(self, chan):
        self._nChannels = self._checkAttr(
            key='nChannels',
            value=chan,
            name='channels'
        )


    @property
    def nBins(self):
        """
        """
        return self._nBins
    @nBins.setter
    @accepts(object, int)
    def nBins(self, bins):
        self._nBins = self._checkAttr(
            key='nBins',
            value=bins,
            name='bins'
        )


    @property
    def nPolars(self):
        """
        """
        return self._nPolars
    @nPolars.setter
    @accepts(object, int)
    def nPolars(self, np):
        self._nPolars = self._checkAttr(
            key='nPolars',
            value=np,
            name='polarizations'
        )


    @property
    def timeRes(self):
        """
        """
        return self._timeRes
    @timeRes.setter
    @accepts(object, (float, int, TimeDelta))
    def timeRes(self, dt):
        if isinstance(dt, TimeDelta):
            dt = dt.sec
        self._timeRes = self._checkAttr(
            key='timeRes',
            value=dt,
            name='time resolution'
        )


    @property
    def tFold(self):
        """
        """
        return self._tFold
    @tFold.setter
    @accepts(object, (float, int, TimeDelta))
    def tFold(self, tfold):
        if isinstance(tfold, TimeDelta):
            tfold = tfold.sec
        self._tFold = self._checkAttr(
            key='tFold',
            value=tfold,
            name='pulsar time fold'
        )


    @property
    def dsTime(self):
        """ Downsampling, can take values in [1, 2, 4, 8, 16, 32, 64, 128]
        """
        return self._dsTime
    @dsTime.setter
    @accepts(object, int)
    def dsTime(self, ds):
        # Check that ds is a power of 2:
        is2pow = (ds & (ds-1) == 0) and ds != 0
        if not is2pow:
            raise ValueError(
                "`dsTime` takes only power of two integer values."
            )
        self._dsTime = self._checkAttr(
            key='dsTime',
            value=ds,
            name='pulsar downsampling'
        )


    @property
    def nBits(self):
        """
        """
        return self._nBits
    @nBits.setter
    @accepts(object, int)
    def nBits(self, n):
        # Check that n is a power of 2:
        is2pow = (n & (n-1) == 0) and n != 0
        if not is2pow:
            raise ValueError(
                "`nBits` takes only power of two integer values."
            )
        self._nBits = self._checkAttr(
            key='nBits',
            value=n,
            name='bits'
        )


    @property
    def freqRes(self):
        """
        """
        return self._freqRes
    @freqRes.setter
    @accepts(object, (float, int, u.Quantity))
    def freqRes(self, df):
        if isinstance(df, u.Quantity):
            df = df.to(u.Hz).value
        self._freqRes = self._checkAttr(
            key='freqRes',
            value=df,
            name='frequency resolution'
        )


    @property
    def nMAs(self):
        """
        """
        return self._nMAs
    @nMAs.setter
    @accepts(object, int)
    def nMAs(self, ma):
        self._nMAs = self._checkAttr(
            key='nMAs',
            value=ma,
            name='Mini-Arrays'
        )


    @property
    def durationSec(self):
        """
        """
        return self._durationSec
    @durationSec.setter
    @accepts(object, (float, int, TimeDelta), strict=False)
    def durationSec(self, s):
        if isinstance(s, TimeDelta):
            s = s.sec
        self._durationSec = s


    # --------------------------------------------------------- #
    # ------------------------ Methods ------------------------ #


    # --------------------------------------------------------- #
    # ----------------------- Internal ------------------------ #
    def _checkAttr(self, key, value, name):
        attrProp = backendProperties[self._backend][key]
        minVal = attrProp['min']
        maxVal = attrProp['max']
        defaultVal = attrProp['default']
        if value > maxVal:
            log.warning(
                f"Maximal value of {name} is {maxVal} (<{value}). Setting to default '{key}={defaultVal}'."
            )
            value = defaultVal
        elif value < minVal:
            log.warning(
                f"Minimal value for {name} is {minVal} (>{value}). Setting to default '{key}={defaultVal}'."
            )
            value = defaultVal
        return value
# ============================================================= #
# ============================================================= #


# ============================================================= #
# ------------------------- BSTConfig ------------------------- #
# ============================================================= #
[docs] @doc('*Beamlet Statistics* observation configuration.', 'bst') class BSTConfig(_BackendConfig): def __init__(self, **kwargs): super().__init__(backend='bst', **kwargs) # --------------------------------------------------------- # # --------------------- Getter/Setter --------------------- # @property def volume(self): """ Computes an estimation of the data volume of a *BST* FITS file. :getter: Data volume. :type: :class:`~astropy.units.Quantity` :Example: >>> from nenupy.observation import BSTConfig >>> bstconf = BSTConfig( durationSec=3600 ) >>> bstconf.volume 21.09375 Mibyte >>> from nenupy.observation import BSTConfig >>> bstconf = BSTConfig.fromParset( 'nenufar_obs.parset' ) >>> bstconf.volume XXX Mibyte .. warning:: The data volume estimation does not handle specificities of the FITS file in which the *BST* are stored (in particular metadata and FITS architecture). Therefore, the volume may be underestimated by a few MB. """ log.debug(str(self)) nElements = self.nPolars * self.durationSec * self.nSubBands return (nElements * float32).to(u.Mibyte) # --------------------------------------------------------- # # ------------------------ Methods ------------------------ #
[docs] @classmethod @accepts(type, (Parset, str)) def fromParset(cls, parset): """ Returns a :class:`~nenupy.observation.obs_config.BSTConfig` instance in which *BST* observation configuration properties are set as defined by the ``parset``. :param parset: Observation parset file. :type parset: `str` or :class:`~nenupy.observation.parset.Parset` :returns: *BST* configuration as defined by the ``parset`` file. :rtype: :class:`~nenupy.observation.obs_config.BSTConfig` :Example: >>> from nenupy.observation import BSTConfig >>> bstconf = BSTConfig.fromParset('nenufar_obs.parset') """ if isinstance(parset, str): parset = Parset(parset) dbeams = parset.digibeams # Find out the total duration of observation # Loop over the digibeams, as they can be simultaneous totalTimes = np.array([]) for db in dbeams.keys(): dts = TimeDelta( np.arange(dbeams[db]['duration']), format='sec' ) dbTimes = dbeams[db]['startTime'] + dts totalTimes = np.union1d(totalTimes, dbTimes.jd) return BSTConfig( durationSec=totalTimes.size, startTime=parset.observation['startTime'] )
# ============================================================= # # ============================================================= # # ============================================================= # # ----------------------- NICKELConfig ------------------------ # # ============================================================= #
[docs] @doc('*NICKEL* correlator observation configuration.', 'nickel') class NICKELConfig(_BackendConfig): def __init__(self, **kwargs): super().__init__(backend='nickel', **kwargs) # --------------------------------------------------------- # # --------------------- Getter/Setter --------------------- # @property def volume(self): """ Computes an estimation of the data volume of a *NICKEL* Measurement Set. :getter: Data volume. :type: :class:`~astropy.units.Quantity` :Example: >>> from nenupy.observation import NICKELConfig >>> nriconf = NICKELConfig( nMAs=56, nSubBands=244, nChannels=64, timeRes=1, durationSec=3600 ) >>> nriconf.volume.to('Tibyte') 2.6112914 Tibyte >>> from nenupy.observation import NICKELConfig >>> nriconf = NICKELConfig.fromParset( 'nenufar_obs.parset' ) >>> nriconf.volume XXX Gibyte .. warning:: The data volume estimation does not handle specificities of the Measurement Set. Therefore, the volume may be underestimated. """ log.debug(str(self)) nBaselines = self.nMAs * (self.nMAs - 1)/2 + self.nMAs visVolume = self.nPolars * nBaselines * complex64 ratePerSB = visVolume * self.nChannels / self.timeRes ratePerObs = ratePerSB * self.nSubBands return (ratePerObs * self.durationSec).to(u.Gibyte) # --------------------------------------------------------- # # ------------------------ Methods ------------------------ #
[docs] @classmethod @accepts(type, (Parset, str)) def fromParset(cls, parset): """ Returns a :class:`~nenupy.observation.obs_config.NICKELConfig` instance in which *NICKEL* observation configuration properties are set as defined by the ``parset``. :param parset: Observation parset file. :type parset: `str` or :class:`~nenupy.observation.parset.Parset` :returns: *NICKEL* configuration as defined by the ``parset`` file. :rtype: :class:`~nenupy.observation.obs_config.NICKELConfig` :Example: >>> from nenupy.observation import NICKELConfig >>> nriconf = NICKELConfig.fromParset('nenufar_obs.parset') """ if isinstance(parset, str): parset = Parset(parset) out = parset.output anabeams = parset.anabeams if 'nri_receivers' not in out.keys(): # Nickel receiver has not been used return NICKELConfig( startTime=parset.observation['startTime'] ) elif 'nickel' not in out['nri_receivers']: # Nickel receiver has not been used return NICKELConfig( startTime=parset.observation['startTime'] ) # Hypothesis that only one analog beam is used! return NICKELConfig( durationSec=anabeams[0]['duration'], timeRes=out['nri_dumpTime'], nSubBands=len(out['nri_subbandList']), nChannels=out['nri_channelization'], nMAs=len(anabeams[0]['maList']), startTime=parset.observation['startTime'] )
# ============================================================= # # ============================================================= # # ============================================================= # # --------------------- _UnDySPuTeDConfig --------------------- # # ============================================================= # class _UnDySPuTeDConfig(_BackendConfig): """ .. versionadded:: 1.2.0 """ def __init__(self, backend, **kwargs): super().__init__(backend=backend, **kwargs) def __str__(self): className = self.__class__.__name__ title = "Backend configuration of type '{}'\n".format(className) properties = '' for i, beam in enumerate(self._beamConfigs): attributes = backendProperties[beam._backend].keys() properties += '\tBeam {} Properties: '.format(i) for att in attributes: properties += "'{}={}', ".format(att, getattr(beam, att)) properties += "'{}={}', \n".format('durationSec', getattr(beam, 'durationSec')) properties = properties[:-4] # remove the last line skip and coma return title + properties # --------------------------------------------------------- # # ----------------------- Internal ------------------------ # def _parseParameters(self, parameters, pulsar=False): """ Parse values from the digital beam 'parameters' entry. E.g. 'TF: DF=3.05 DT=10.0 HAMM' """ parameters = parameters.lower() mode = parameters.split(':')[0] if pulsar: configs = { param.split('=')[0]: param.split('=')[1]\ for param in parameters.split('--')\ if '=' in param } configs.update({ param.rstrip(): True\ for param in parameters.split('--')\ if '=' not in param }) else: configs = { param.split('=')[0]: param.split('=')[1]\ for param in parameters.split()\ if '=' in param } configs.update({ param.rstrip(): True\ for param in parameters.split('--')\ if '=' not in param }) return mode, configs # --------------------------------------------------------- # # ----------------------- Internal ------------------------ # @staticmethod @accepts((float, int, u.Quantity), (float, int, u.Quantity)) def _checkDFvsDT(dt, df): """ Short time resolutions are impossible for narrow frequency resolutions. Some df/dt combinations are therefore not allowed. """ if isinstance(dt, u.Quantity): dt = dt.to(u.s).value if isinstance(df, u.Quantity): df = df.to(u.Hz).value allowedFftlen = 2**( np.arange(8) + 4 ) allowedNfft2int = 2**(np.arange(9) + 2) # Find the closest fftlen to the desired df value fftlen = allowedFftlen[ np.argmin( np.abs(allowedFftlen - 1.0 / 5.12e-6 / df) # 1/(5.12e-6 s) = 195312.5 Hz ) ] # Find the closest nfft2int to the desired df value nfft2int = allowedNfft2int[ np.argmin( np.abs(allowedNfft2int - dt / (5.12e-6 * fftlen)) ) ] dtEff = 5.12e-6 * fftlen * nfft2int dfEff = 1.0 / 5.12e-6 / fftlen log.debug( "'freqRes={0:.2f}', 'timeRes={1:.2f}' <--> 'df={2:.2f}', 'dt={3:.2f}' ('fftlen={4}', 'nfft2int={5}')".format( (df*u.Hz).to(u.kHz), (dt*u.s).to(u.ms), (dfEff*u.Hz).to(u.kHz), (dtEff*u.s).to(u.ms), fftlen, nfft2int ) ) return dtEff, dfEff # ============================================================= # # ============================================================= # # ============================================================= # # ----------------------- _TFBeamConfig ----------------------- # # ============================================================= # class _TFBeamConfig(_BackendConfig): """ .. versionadded:: 1.2.0 """ def __init__(self, **kwargs): super().__init__(backend='tf', **kwargs) # --------------------------------------------------------- # # --------------------- Getter/Setter --------------------- # @property def volume(self): """ """ log.debug(str(self)) ratePerSB = self.nPolars * float32 * (200.e6/1024./self.freqRes) / self.timeRes rateObs = ratePerSB * self.nSubBands return (rateObs * self.durationSec).to(u.Gibyte) # ============================================================= # # ============================================================= # # ============================================================= # # ------------------------- TFConfig -------------------------- # # ============================================================= #
[docs] @doc('*UnDySPuTeD Time-Frequency* mode observation configuration.', 'tf') class TFConfig(_UnDySPuTeDConfig): def __init__(self, _setFromParset=False, **kwargs): if not _setFromParset: super().__init__(backend='tf', **kwargs) self.timeRes, self.freqRes = self._checkDFvsDT( dt=self.timeRes, df=self.freqRes ) kwargs['timeRes'] = self.timeRes kwargs['freqRes'] = self.freqRes self._beamConfigs = [ _TFBeamConfig(**kwargs) ] else: self._beamConfigs = [] # --------------------------------------------------------- # # --------------------- Getter/Setter --------------------- # @property def volume(self): """ Computes an estimation of the data volume of an *UnDySPuTeD-TF* observation file. :getter: Data volume. :type: :class:`~astropy.units.Quantity` :Example: >>> from nenupy.observation import TFConfig >>> tfconf = TFConfig( nSubBands=500, timeRes=42e-3, freqRes=200, durationSec=3600 ) >>> tfconf.volume 654.83619 Gibyte >>> from nenupy.observation import TFConfig >>> tfconf = TFConfig.fromParset( 'nenufar_obs.parset' ) >>> tfconf.volume XXX Gibyte .. note:: Combinations of ``timeRes`` and ``freqRes`` pairs are limited to that available within the *UnDySPuTeD* receiver. If set otherwise, the closest allowed values will be filled instead. One can check the corresponding attributes after setting up the desired configuration: >>> tfconf = TFConfig( nSubBands=500, timeRes=1e-3, freqRes=200, durationSec=3600 ) >>> tfconf.timeRes 0.02097152 >>> tfconf.freqRes 190.73486328125 Altenatively, one can set the log to ``DEBUG`` in order to print conversion details: >>> import logging >>> logging.getLogger('nenupy').setLevel(logging.DEBUG) >>> nriconf = TFConfig( nSubBands=500, timeRes=1e-3, freqRes=200, durationSec=3600 ) 2020-12-16 17:28:52 -- DEBUG: 'freqRes=0.20 kHz', 'timeRes=1.00 ms' <--> 'df=0.19 kHz', 'dt=20.97 ms' ('fftlen=1024', 'nfft2int=4') """ vol = 0 * u.Gibyte for bc in self._beamConfigs: vol += bc.volume return vol # --------------------------------------------------------- # # ------------------------ Methods ------------------------ #
[docs] @classmethod @accepts(type, (str, Parset)) def fromParset(cls, parset): """ Returns a :class:`~nenupy.observation.obs_config.TFConfig` instance in which *UnDySPuTeD-TF* observation configuration properties are set as defined by the ``parset``. :param parset: Observation parset file. :type parset: `str` or :class:`~nenupy.observation.parset.Parset` :returns: *UnDySPuTeD-TF* configuration as defined by the ``parset`` file. :rtype: :class:`~nenupy.observation.obs_config.TFConfig` :Example: >>> from nenupy.observation import TFConfig >>> tfconf = TFConfig.fromParset('nenufar_obs.parset') """ if isinstance(parset, str): parset = Parset(parset) out = parset.output digibeams = parset.digibeams tf = TFConfig(_setFromParset=True) tf.startTime = parset.observation['startTime'] beamConfigs = [] if 'undysputed' not in out['hd_receivers']: # UnDySPuTeD receiver has not been used pass else: for db in digibeams.keys(): if digibeams[db]['toDo'].lower() != 'dynamicspectrum': continue try: mode, parameters = tf._parseParameters(digibeams[db]['parameters']) except KeyError: log.warning( "Parset '{}' has no 'parameters' key.".format(parset.parset) ) continue if mode != 'tf': continue dt, df = tf._checkDFvsDT( dt=(float(parameters['dt'])*u.ms).to(u.s).value, df=(float(parameters['df'])*u.kHz).to(u.Hz).value ) beamConfigs.append( _TFBeamConfig( timeRes=dt, freqRes=df, durationSec=digibeams[db]['duration'], nSubBands=len(digibeams[db]['subbandList']) ) ) tf._beamConfigs = beamConfigs return tf
# ============================================================= # # ============================================================= # # ============================================================= # # ---------------------- _RawBeamConfig ----------------------- # # ============================================================= # class _RawBeamConfig(_BackendConfig): """ .. versionadded:: 1.2.0 """ def __init__(self, **kwargs): super().__init__(backend='raw', **kwargs) # --------------------------------------------------------- # # --------------------- Getter/Setter --------------------- # @property def volume(self): """ """ log.debug(str(self)) duration = self.durationSec - 60 # Burning time at start nBytes = self.nBits / 8 * u.byte rateSB = self.nPolars * nBytes / 5.12e-6 rateObs = rateSB * self.nSubBands return (rateObs * duration).to(u.Gibyte) # ============================================================= # # ============================================================= # # ============================================================= # # ------------------------- RAWConfig ------------------------- # # ============================================================= #
[docs] @doc('*UnDySPuTeD Waveform* mode observation configuration.', 'raw') class RAWConfig(_UnDySPuTeDConfig): def __init__(self, _setFromParset=False, **kwargs): if not _setFromParset: super().__init__(backend='raw', **kwargs) self._beamConfigs = [ _RawBeamConfig(**kwargs) ] else: self._beamConfigs = [] # --------------------------------------------------------- # # --------------------- Getter/Setter --------------------- # @property def volume(self): """ Computes an estimation of the data volume of an *UnDySPuTeD-RAW* observation file. :getter: Data volume. :type: :class:`~astropy.units.Quantity` :Example: >>> from nenupy.observation import RAWConfig >>> rawconf = RAWConfig( durationSec=3600 ) >>> rawconf.volume 494.53229 Gibyte >>> from nenupy.observation import RAWConfig >>> rawconf = RAWConfig.fromParset( 'nenufar_obs.parset' ) >>> rawconf.volume XXX Gibyte """ vol = 0 * u.Gibyte for bc in self._beamConfigs: vol += bc.volume return vol # --------------------------------------------------------- # # ------------------------ Methods ------------------------ #
[docs] @classmethod @accepts(type, (Parset, str)) def fromParset(cls, parset): """ Returns a :class:`~nenupy.observation.obs_config.RAWConfig` instance in which *UnDySPuTeD-RAW* observation configuration properties are set as defined by the ``parset``. :param parset: Observation parset file. :type parset: `str` or :class:`~nenupy.observation.parset.Parset` :returns: *UnDySPuTeD-RAW* configuration as defined by the ``parset`` file. :rtype: :class:`~nenupy.observation.obs_config.RAWConfig` :Example: >>> from nenupy.observation import RAWConfig >>> rawconf = RAWConfig.fromParset('nenufar_obs.parset') """ if isinstance(parset, str): parset = Parset(parset) out = parset.output digibeams = parset.digibeams raw = RAWConfig(_setFromParset=True) raw.startTime = parset.observation['startTime'] beamConfigs = [] if 'undysputed' not in out['hd_receivers']: # UnDySPuTeD receiver has not been used pass else: for db in digibeams.keys(): if digibeams[db]['toDo'].lower() != 'waveform': continue beamConfigs.append( _RawBeamConfig( durationSec=digibeams[db]['duration'], nSubBands=len(digibeams[db]['subbandList']) ) ) raw._beamConfigs = beamConfigs return raw
# ============================================================= # # ============================================================= # # ============================================================= # # ---------------------- _FoldBeamConfig ---------------------- # # ============================================================= # class _FoldBeamConfig(_BackendConfig): """ .. versionadded:: 1.2.0 """ def __init__(self, **kwargs): super().__init__(backend='pulsar_fold', **kwargs) # --------------------------------------------------------- # # --------------------- Getter/Setter --------------------- # @property def volume(self): """ """ log.debug(str(self)) duration = self.durationSec - 60 # Burning time at start duration = 0 if duration < 0 else duration rateObs = self.nSubBands * self.nPolars * float32 * self.nBins / self.tFold return (rateObs * duration).to(u.Gibyte) # ============================================================= # # ============================================================= # # ============================================================= # # ---------------------- PulsarFoldConfig --------------------- # # ============================================================= #
[docs] @doc('*UnDySPuTeD Pulsar-FOLD* mode observation configuration.', 'pulsar_fold') class PulsarFoldConfig(_UnDySPuTeDConfig): def __init__(self, _setFromParset=False, **kwargs): if not _setFromParset: super().__init__(backend='pulsar_fold', **kwargs) self._beamConfigs = [ _FoldBeamConfig(**kwargs) ] else: self._beamConfigs = [] # --------------------------------------------------------- # # --------------------- Getter/Setter --------------------- # @property def volume(self): """ Computes an estimation of the data volume of an *UnDySPuTeD Pulsar-FOLD* observation file. :getter: Data volume. :type: :class:`~astropy.units.Quantity` :Example: >>> from nenupy.observation import PulsarFoldConfig >>> foldconf = PulsarFoldConfig( durationSec=3600 ) >>> foldconf.volume 1.9317667 Gibyte >>> from nenupy.observation import PulsarFoldConfig >>> foldconf = PulsarFoldConfig.fromParset( 'nenufar_obs.parset' ) >>> foldconf.volume XXX Gibyte """ vol = 0 * u.Gibyte for bc in self._beamConfigs: vol += bc.volume return vol # --------------------------------------------------------- # # ------------------------ Methods ------------------------ #
[docs] @classmethod @accepts(type, (Parset, str)) def fromParset(cls, parset): """ Returns a :class:`~nenupy.observation.obs_config.PulsarFoldConfig` instance in which *UnDySPuTeD Pulsar-FOLD* observation configuration properties are set as defined by the ``parset``. :param parset: Observation parset file. :type parset: `str` or :class:`~nenupy.observation.parset.Parset` :returns: *UnDySPuTeD Pulsar-FOLD* configuration as defined by the ``parset`` file. :rtype: :class:`~nenupy.observation.obs_config.PulsarFoldConfig` :Example: >>> from nenupy.observation import PulsarFoldConfig >>> foldconf = PulsarFoldConfig.fromParset('nenufar_obs.parset') """ if isinstance(parset, str): parset = Parset(parset) out = parset.output digibeams = parset.digibeams fold = PulsarFoldConfig(_setFromParset=True) fold.startTime = parset.observation['startTime'] beamConfigs = [] if 'undysputed' not in out['hd_receivers']: # UnDySPuTeD receiver has not been used pass else: for db in digibeams.keys(): if digibeams[db]['toDo'].lower() != 'pulsar': continue try: mode, parameters = fold._parseParameters( digibeams[db]['parameters'], pulsar=True ) except KeyError: log.warning( "Parset '{}' has no 'parameters' key.".format(parset.parset) ) continue if mode != 'fold': continue props = backendProperties['pulsar_fold'] beamConfigs.append( _FoldBeamConfig( nSubBands=len(digibeams[db]['subbandList']), nPolars=1 if 'onlyi' in parameters else 4, tFold=float(parameters.get('tfold', props['tFold']['default'])), durationSec=digibeams[db]['duration'], nBins=int(parameters.get('nbin', props['nBins']['default'])) ) ) fold._beamConfigs = beamConfigs return fold
# ============================================================= # # ============================================================= # # ============================================================= # # -------------------- _WaveolafBeamConfig -------------------- # # ============================================================= # class _WaveolafBeamConfig(_BackendConfig): """ .. versionadded:: 1.2.0 """ def __init__(self, **kwargs): super().__init__(backend='pulsar_waveolaf', **kwargs) # --------------------------------------------------------- # # --------------------- Getter/Setter --------------------- # @property def volume(self): """ """ log.debug(str(self)) duration = self.durationSec - 60 # Burning time at start duration = 0 if duration < 0 else duration rateObs = 451590 * u.byte * self.nSubBands return (rateObs * duration).to(u.Gibyte) # ============================================================= # # ============================================================= # # ============================================================= # # ---------------------- PulsarWaveConfig --------------------- # # ============================================================= #
[docs] @doc('*UnDySPuTeD Pulsar-WAVEOLAF* mode observation configuration.', 'pulsar_waveolaf') class PulsarWaveConfig(_UnDySPuTeDConfig): def __init__(self, _setFromParset=False, **kwargs): if not _setFromParset: super().__init__(backend='pulsar_waveolaf', **kwargs) self._beamConfigs = [ _WaveolafBeamConfig(**kwargs) ] else: self._beamConfigs = [] # --------------------------------------------------------- # # --------------------- Getter/Setter --------------------- # @property def volume(self): """ Computes an estimation of the data volume of an *UnDySPuTeD Pulsar-WAVEOLAF* observation file. :getter: Data volume. :type: :class:`~astropy.units.Quantity` :Example: >>> from nenupy.observation import PulsarWaveConfig >>> waveconf = PulsarWaveConfig( durationSec=3600 ) >>> waveconf.volume 285.85707 Gibyte >>> from nenupy.observation import PulsarWaveConfig >>> waveconf = PulsarWaveConfig.fromParset( 'nenufar_obs.parset' ) >>> waveconf.volume XXX Gibyte """ vol = 0 * u.Gibyte for bc in self._beamConfigs: vol += bc.volume return vol # --------------------------------------------------------- # # ------------------------ Methods ------------------------ #
[docs] @classmethod @accepts(type, (Parset, str)) def fromParset(cls, parset): """ Returns a :class:`~nenupy.observation.obs_config.PulsarWaveConfig` instance in which *UnDySPuTeD Pulsar-WAVEOLAF* observation configuration properties are set as defined by the ``parset``. :param parset: Observation parset file. :type parset: `str` or :class:`~nenupy.observation.parset.Parset` :returns: *UnDySPuTeD Pulsar-WAVEOLAF* configuration as defined by the ``parset`` file. :rtype: :class:`~nenupy.observation.obs_config.PulsarWaveConfig` :Example: >>> from nenupy.observation import PulsarWaveConfig >>> waveconf = PulsarWaveConfig.fromParset('nenufar_obs.parset') """ if isinstance(parset, str): parset = Parset(parset) out = parset.output digibeams = parset.digibeams wave = PulsarWaveConfig(_setFromParset=True) wave.startTime = parset.observation['startTime'] beamConfigs = [] if 'undysputed' not in out['hd_receivers']: # UnDySPuTeD receiver has not been used pass else: for db in digibeams.keys(): if digibeams[db]['toDo'].lower() != 'pulsar': continue try: mode, parameters = wave._parseParameters( digibeams[db]['parameters'], pulsar=True ) except KeyError: log.warning( "Parset '{}' has no 'parameters' key.".format(parset.parset) ) continue if mode != 'waveolaf': continue props = backendProperties['pulsar_waveolaf'] beamConfigs.append( _WaveolafBeamConfig( nSubBands=len(digibeams[db]['subbandList']), durationSec=digibeams[db]['duration'] ) ) wave._beamConfigs = beamConfigs return wave
# ============================================================= # # ============================================================= # # ============================================================= # # --------------------- _SingleBeamConfig --------------------- # # ============================================================= # class _SingleBeamConfig(_BackendConfig): """ .. versionadded:: 1.2.0 """ def __init__(self, **kwargs): super().__init__(backend='pulsar_single', **kwargs) # --------------------------------------------------------- # # --------------------- Getter/Setter --------------------- # @property def volume(self): """ """ log.debug(str(self)) duration = self.durationSec - 60 # Burning time at start duration = 0 if duration < 0 else duration nBytes = self.nBits / 8 * u.byte rateObs = self.nSubBands * self.nPolars * nBytes /5.12e-6 / self.dsTime return (rateObs * duration).to(u.Gibyte) # ============================================================= # # ============================================================= # # ============================================================= # # --------------------- PulsarSingleConfig -------------------- # # ============================================================= #
[docs] @doc('*UnDySPuTeD Pulsar-SINGLE* mode observation configuration.', 'pulsar_single') class PulsarSingleConfig(_UnDySPuTeDConfig): def __init__(self, _setFromParset=False, **kwargs): if not _setFromParset: super().__init__(backend='pulsar_single', **kwargs) self._beamConfigs = [ _SingleBeamConfig(**kwargs) ] else: self._beamConfigs = [] # --------------------------------------------------------- # # --------------------- Getter/Setter --------------------- # @property def volume(self): """ Computes an estimation of the data volume of an *UnDySPuTeD Pulsar-SINGLE* observation file. :getter: Data volume. :type: :class:`~astropy.units.Quantity` :Example: >>> from nenupy.observation import PulsarSingleConfig >>> singleconf = PulsarSingleConfig( durationSec=3600 ) >>> singleconf.volume 15.454134 Gibyte >>> from nenupy.observation import PulsarSingleConfig >>> singleconf = PulsarSingleConfig.fromParset( 'nenufar_obs.parset' ) >>> singleconf.volume XXX Gibyte """ vol = 0 * u.Gibyte for bc in self._beamConfigs: vol += bc.volume return vol # --------------------------------------------------------- # # ------------------------ Methods ------------------------ #
[docs] @classmethod @accepts(type, (Parset, str)) def fromParset(cls, parset): """ Returns a :class:`~nenupy.observation.obs_config.PulsarSingleConfig` instance in which *UnDySPuTeD Pulsar-SINGLE* observation configuration properties are set as defined by the ``parset``. :param parset: Observation parset file. :type parset: `str` or :class:`~nenupy.observation.parset.Parset` :returns: *UnDySPuTeD Pulsar-SINGLE* configuration as defined by the ``parset`` file. :rtype: :class:`~nenupy.observation.obs_config.PulsarSingleConfig` :Example: >>> from nenupy.observation import PulsarSingleConfig >>> waveconf = PulsarSingleConfig.fromParset('nenufar_obs.parset') """ if isinstance(parset, str): parset = Parset(parset) out = parset.output digibeams = parset.digibeams single = PulsarSingleConfig(_setFromParset=True) single.startTime = parset.observation['startTime'] beamConfigs = [] if 'undysputed' not in out['hd_receivers']: # UnDySPuTeD receiver has not been used pass else: for db in digibeams.keys(): if digibeams[db]['toDo'].lower() != 'pulsar': continue try: mode, parameters = single._parseParameters( digibeams[db]['parameters'], pulsar=True ) except KeyError: log.warning( "Parset '{}' has no 'parameters' key.".format(parset.parset) ) continue if mode != 'single': continue props = backendProperties['pulsar_single'] beamConfigs.append( _SingleBeamConfig( nSubBands=len(digibeams[db]['subbandList']), nPolars=1 if 'onlyi' in parameters else 4, dsTime=int(parameters.get('dstime', props['dsTime']['default'])), durationSec=digibeams[db]['duration'], nBits=int(parameters.get('nbits', props['nBits']['default'])) ) ) single._beamConfigs = beamConfigs return single
# ============================================================= # # ============================================================= # backendClasses = { 'nickel': NICKELConfig, 'raw': RAWConfig, 'tf': TFConfig, 'bst': BSTConfig, # 'sst': '', # 'xst': '', 'pulsar_fold': PulsarFoldConfig, 'pulsar_waveolaf': PulsarWaveConfig, 'pulsar_single': PulsarSingleConfig } # ============================================================= # # ------------------------- ObsConfig ------------------------- # # ============================================================= #
[docs] class ObsConfig(object): """ Main observation configuration class. .. versionadded:: 1.2.0 """ def __init__(self): for key in backendClasses: setattr(self, key, []) def __add__(self, other): if not isinstance(other, self.__class__): raise TypeError('{} expected.'.format(self.__class__)) new = ObsConfig() for key in backendClasses: summedVal = getattr(self, key) + getattr(other, key) setattr(new, key, summedVal) return new # --------------------------------------------------------- # # --------------------- Getter/Setter --------------------- # @property def volume(self): """ Computes an estimation of the data volume output for all the *NenuFAR* receivers. If the object :class:`~nenupy.observation.obs_config.ObsConfig` has been set with several parset files (with the method :meth:`~nenupy.observation.obs_config.ObsConfig.fromParsetList`), the volumes are summed over all observations. :getter: Data volume. :type: `dict` of :class:`~astropy.units.Quantity` :Example: >>> from nenupy.observation import ObsConfig >>> obsconf = ObsConfig.fromParset( 'nenufar_obs.parset' ) >>> obsconf.volume {'nickel': <Quantity 0. Gibyte>, 'raw': <Quantity 0. Gibyte>, 'tf': <Quantity 0. Gibyte>, 'bst': <Quantity 20.625 Mibyte>, 'pulsar_fold': <Quantity 3.7763691 Gibyte>, 'pulsar_waveolaf': <Quantity 558.79404545 Gibyte>, 'pulsar_single': <Quantity 0. Gibyte>} """ volumes = {} for key in backendClasses.keys(): volumes[key] = sum([subconf.volume for subconf in getattr(self, key)]) return volumes # --------------------------------------------------------- # # ------------------------ Methods ------------------------ #
[docs] @classmethod @accepts(type, (Parset, str)) def fromParset(cls, parset): """ Returns a :class:`~nenupy.observation.obs_config.ObsConfig` instance in which all *NenuFAR* receiver configuration properties are set as defined by the ``parset``. :param parset: Observation parset file. :type parset: `str` or :class:`~nenupy.observation.parset.Parset` :returns: Full *NenuFAR* receiver configurations as defined by the ``parset`` file. :rtype: :class:`~nenupy.observation.obs_config.ObsConfig` :Example: >>> from nenupy.observation import ObsConfig >>> obsconf = ObsConfig.fromParset('nenufar_obs.parset') """ if isinstance(parset, str): parset = Parset(parset) dv = ObsConfig() for key in backendClasses.keys(): #setattr(dv, key, backendClasses[key].fromParset(parset)) getattr(dv, key).append(backendClasses[key].fromParset(parset)) return dv
[docs] @classmethod @accepts(type, list) def fromParsetList(cls, parsetList): """ Returns a :class:`~nenupy.observation.obs_config.ObsConfig` instance in which all *NenuFAR* receiver configuration properties are set as defined by each parset file conained in ``parsetList``. :param parsetList: List of observation parset files. :type parsetList: `list` of `str` or :class:`~nenupy.observation.parset.Parset` :returns: *NenuFAR* receiver configurations for all observations defined by the parset files listed in ``parsetList``. :rtype: :class:`~nenupy.observation.obs_config.ObsConfig` :Example: >>> from nenupy.observation import ObsConfig >>> obsconf = ObsConfig.fromParsetList( ['nenufar_obs_1.parset', 'nenufar_obs_2.parset'] ) """ if not isinstance(parsetList, list): raise TypeError( "`parsetList` should be a `list`." ) tot = ObsConfig() for parset in parsetList: obs = ObsConfig.fromParset(parset) tot += obs return tot
[docs] @accepts(object, str, (str, u.Quantity), strict=False) def getCumulativeVolume(self, receiver, unit='Tibyte'): """ Gets an estimation of the cumulative raw data volume over time computed from the observations listed in the current :class:`~nenupy.observation.obs_config.ObsConfig` instance for the given ``receiver``. :param receiver: Name of the receiver from which the cumulative data volume is estimated. :type receiver: `str` :param unit: Data volume unit in which the cumulative volume will be expressed (see also `binary unit prefixes <https://docs.astropy.org/en/stable/units/standard_units.html#prefixes>`_). :type unit: `str` or :class:`~astropy.units.Quantity` :returns: Observation start times and cumulative data volumes. :rtype: (:class:`~astropy.time.Time`, :class:`~numpy.ndarray`) :Example: >>> from nenupy.observation import ObsConfig >>> obsconf = ObsConfig.fromParsetList( ['nenufar_obs_1.parset', 'nenufar_obs_2.parset'] ) >>> times, volumes = obsconf.getCumulativeVolume( receiver='nickel', unit='Gibyte' ) """ if receiver not in backendClasses.keys(): raise ValueError( f"Receiver '{receiver}' not in '{backendClasses.keys()}'" ) obs_list = getattr(self, receiver) times = Time([obs.startTime for obs in obs_list]) indices = np.argsort(times.mjd) times = times[indices] volumes = np.array([obs.volume.to(unit).value for obs in obs_list]) cumVol = np.cumsum(volumes[indices]) return times, cumVol
[docs] def plotCumulativeVolume(self, figname='', **kwargs): """ Plots the cumulative raw data volume estimation. :param figname: Figure name to store. If set to ``''`` (by default), the figure is only displayed. :type figname: `str` :param figsize: Figure size in inches (default: ``(10, 5)``). :type figsize: `tuple` :param unit: Data volume unit in which the cumulative volume will be expressed (see also `binary unit prefixes <https://docs.astropy.org/en/stable/units/standard_units.html#prefixes>`_). Default is ``'Tibyte'``. :type unit: `str` or :class:`~astropy.units.Quantity` :param receivers: List of receivers whose cumulative data volumes must be plotted. Default: all available *NenuFAR* receivers. :type receivers: `list` of `str` :param scale: y-axis scaling (``'linear'`` or ``'log'``). :type scale: `str` :param title: Title of the plot. :type title: `str` :param grid: Add a grid to help the visualization. Default is ``True``. :type grid: `bool` :param tMin: Minimum time to represent. :type tMin: `str` or :class:`~astropy.time.Time` :param tMax: Maximum time to represent. :type tMax: `str` or :class:`~astropy.time.Time` """ import matplotlib.pylab as plt from itertools import cycle fig = plt.figure( figsize=kwargs.get('figsize', (10, 5)) ) unit = kwargs.get('unit', 'Tibyte') receivers = kwargs.get('receivers', list(backendClasses.keys())) if not isinstance(receivers, list): raise TypeError( "`receivers` must be set as a list." ) lStyles = [ 'solid', 'dotted', 'dashed', 'dashdot', (0, (5, 5)), # loose dashed (0, (3, 5, 1, 5, 1, 5)), # dashdotteddotted (0, (3, 1, 1, 1, 1, 1)) # dense dashdotteddotted ] linecycler = cycle(lStyles) volCumDico = {} for receiver in receivers: times, cumVol = self.getCumulativeVolume( receiver=receiver, unit=unit ) volCumDico[receiver] = { 'times': times, 'cumulative_sum': cumVol } plt.plot( times.datetime, cumVol, label=receiver, linewidth=1, linestyle=next(linecycler) ) plt.plot( times.datetime, sum([volCumDico[k]['cumulative_sum'] for k in volCumDico.keys()]), label='Total', color='black', linestyle='solid', linewidth=2, ) plt.yscale(kwargs.get('scale', 'linear')) plt.legend() plt.xlabel('UTC Time') plt.ylabel(f'Raw data volume ({unit})') plt.title(kwargs.get('title', '')) plt.xlim( ( Time(kwargs.get('tMin', times[0])).datetime, Time(kwargs.get('tMax', times[-1])).datetime ) ) if kwargs.get('grid', True): plt.grid() if figname == '': plt.show() else: plt.savefig( figname, dpi=300, transparent=True, bbox_inches='tight' )
# ============================================================= # # ============================================================= #