nenupy.io.tf.TFTask

class nenupy.io.tf.TFTask(name, func, args_to_update=[], repeatable=False)[source]

Bases: object

Class to handle a single task/operation designed to be applied to time-frequency data.

Example

Here is a complete example of how a TFTask works on its own.

>>> from nenupy.io.tf import TFTask
>>> from nenupy.io.tf_utils import TFPipelineParameters, _ValueParameter
>>> import numpy as np

# Generate a dataset
>>> data = np.ones((5, 2, 1)) # time, frequency, polarization

# Define a function that modifies the data
>>> def multiply_by_first_axis_index(data: np.ndarray, scale: float) -> np.ndarray:
>>>    '''A completely made-up function'''
>>>    first_axis = np.arange(data.shape[0])
>>>    return scale * data * first_axis[:, None, None]

# Generate an object listing parameters available to the function(s)
>>> custom_parameters = TFPipelineParameters(
        _ValueParameter(
            name="scale",
            default=1.,
            param_type=float,
            min_val=0.1,
            max_val=2.0
        )
    )

# Generate a TFTask object
>>> custom_task = TFTask(
        name="My task - custom",
        func=multiply_by_first_axis_index,
        args_to_update=["scale"]
    )

# Modify the value of the pipeline parameter
>>> custom_parameters["scale"] = 1.5

# Update the TFTask, the new parameter value is taken into account
>>> custom_task.update(parameters=custom_parameters)

# Call the TFTask and run it
>>> time_unix, frequency_hz, result = custom_task(
        time_unix=None,
        frequency_hz=None,
        data=data
    )

>>> print(result[:, 0, :].ravel())
[0.  1.5 3.  4.5 6. ]
__call__(time_unix, frequency_hz, data, **kwds)[source]

Run the TFTask. Before running the tasks, it is recommended to call update() first in order to take into account the most up-to-date values of the parmaeters that may be used by the task.

Parameters:
  • time_unix (ndarray) – Time description of the data array, in unix format (array of float)

  • frequency_hz (ndarray) – Frequency description of the data array, in Hz (array of float)

  • data (ndarray or Array) – Data to be processed

  • **kwds (Any) – Extra keyword arguments directly passed to the embedded function

Returns:

(time_unix, frequency_hz, processed data)

Return type:

Tuple[ndarray, ndarray, ndarray or Array]

__init__(name, func, args_to_update=[], repeatable=False)[source]

Generate a TFTask instance.

Parameters:
  • name (str) – Name of the task (it can be anything)

  • func (Callable) – Function that applies the operation to the data

  • args_to_update (List[str], optional) – List of parameter names that are extracted from TFPipelineParameters and are required by func. These parameters will be updated by their current values (stored in TFPipelineParameters) prior to running the task., by default []

  • repeatable (bool, optional) – Allow or not the task to be called several times by a given pipeline, by default False

Methods

__init__(name, func[, args_to_update, ...])

Generate a TFTask instance.

correct_bandpass()

TFTask to correct for the sub-band bandpass response.

correct_faraday_rotation()

TFTask calling de_faraday_data() to correct for Faraday rotation for a given 'rotation_measure' set in parameters.

correct_parallactic_rotation()

TFTask calling correct_parallactic().

correct_polarization()

TFTask calling apply_dreambeam_corrections().

de_disperse()

TFTask calling de_disperse_array() to de-disperse the data using the 'dispersion_measure' set in parameters.

flatten_subband()

TFTask to flatten each sub-band bandpass.

frequency_rebin()

TFTask to re-bin the data in frequency.

get_stokes()

TFTask to compute the Stokes parameters by calling compute_stokes_parameters().

mitigate_frequency_rfi([sigma_clip, ...])

TFTask to perform RFI mitigation by sigma clipping using mitigate_rfi_along_axis().

mitigate_time_rfi([sigma_clip, ...])

TFTask to perform RFI mitigation by sigma clipping using mitigate_rfi_along_axis().

remove_channels()

TFTask calling remove_channels_per_subband() to set a list of sub-band channels to NaN values.

time_rebin()

TFTask to re-bin the data in time.

update(parameters)

Update the embedded function's arguments to those listed in parameters that match args_to_update.

Attributes

args_to_update

List of embedded function's arguments to update before running the function.

is_active

Attribute modified by update() which tells whether the configured parameters enable the embedded function usage.

property args_to_update

List of embedded function’s arguments to update before running the function. If func contains keyword arguments, their values will be updated by the matching key value during the call of update().

Returns:

List of function’s arguments to update()

Return type:

List[str]

Raises:

KeyError – Raised if any listed argument does not correspond with one of the function’s.

classmethod correct_bandpass()[source]

TFTask to correct for the sub-band bandpass response.

A Poly-Phase Filter is involved in the NenuFAR data acquisition pipeline to split the data stream into sub-bands. The combination of the filter shape and a Fourier transform results in a non-flat response across each sub-band. This TFTask calls the correct_bandpass() function.

Example

>>> from nenupy.io.tf import Spectra, TFTask, TFPipeline

>>> sp = Spectra("/my/file.spectra")
>>> sp.pipeline = TFPipeline(sp, TFTask.correct_bandpass())
>>> data = sp.get(...)
../_images/tf_bandpass_correction.png
classmethod correct_faraday_rotation()[source]

TFTask calling de_faraday_data() to correct for Faraday rotation for a given 'rotation_measure' set in parameters.

Example

>>> from nenupy.io.tf import Spectra, TFTask
>>> import astropy.units as u

>>> sp_pulsar = Spectra(".../pulsar.spectra")
>>> sp_pulsar.pipeline.set_default()
>>> sp_pulsar.pipeline.insert(TFTask.de_disperse(), 1)
>>> sp_pulsar.pipeline.insert(TFTask.correct_faraday_rotation(), 2)
>>> sp_pulsar.pipeline.info()
>>> defaraday_data = sp_pulsar.get(
        tmin="2023-10-02T08:40:40.8500000",
        tmax="2023-10-02T08:40:43.16000000",
        fmin=50,
        fmax=65,
        stokes=["U/I"],
        remove_channels=[0, 1, -1],
        rebin_df = 12207.03125 * 4 * u.Hz,
        dispersion_measure=2.972719 * u.pc / u.cm**3,
        rotation_measure=5.9 * u.rad / u.m**2
    )
>>> defaraday_data[:600, :, :].plot(polarization="U/I", db=False)
../_images/tf_pulsar_defaraday.png

Notes

In the above example, we first had to de-disperse the data with de_disperse().

classmethod correct_parallactic_rotation()[source]

TFTask calling correct_parallactic(). This allows for parallactic angle correction by aplying the inverse rotation.

Warning

This task must be computed early in the pipeline process as it involves full Jones operations.

classmethod correct_polarization()[source]

TFTask calling apply_dreambeam_corrections(). This allows for beam correction (including leakage and parallactic angle correction).

Example

>>> from nenupy.io.tf import Spectra, TFTask, TFPipeline
>>> from astropy.coordinates import SkyCoord
>>> import astropy.units as u

>>> sp = Spectra("/my/solar_observation.spectra")
>>> selection_tmin = sp.time_min
>>> selection_tmax = sp.time_max
>>> selection_fmin = 50 * u.MHz
>>> selection_fmax = 50.2 * u.MHz
>>> rebin_df = 1 * u.MHz
>>> rebin_dt = 60 * u.s
>>> mean_time = selected_time_min + (selected_time_max - selected_time_min) / 2
>>> sun = SolarSystemTarget.from_name("Sun", mean_time).coordinates

>>> sp.pipeline.insert(TFTask.correct_polarization(), 1)
>>> data_i_corr = sp.get(
        tmin=selection_tmin,
        tmax=selection_tmax,
        fmin=selection_fmin,
        fmax=selection_fmax,
        beam=0,
        stokes="I",
        remove_channels=[0, 1, -1],
        rebin_dt=rebin_dt,
        rebin_df=rebin_df,
        calib_dt= 20 * u.s,
        skycoord=sun,
        dreambeam_parallactic=True
    )
../_images/tf_correct_polar.png

In this above example, the solar observation lasts for almost seven hours. During the tracking of the Sun, a variety of NenuFAR antenna gains are then explored. Since the dipoles are less sensitive near the horizon, the blue curve shows a bell-like pattern with minima at the time boundaries (when the Sun is at its lowest elevations) and a maximum during the culmination. The corrected data, in orange, are flatter, indicative of the beam correction.

Warning

This task must be computed early in the pipeline process as it involves full Jones operations. As of now, NenuFAR cross-polarization terms are not fully taken into account by DreamBeam. Results may vary and the tests we have made are not yet conclusive. We are working on an improved version.

classmethod de_disperse()[source]

TFTask calling de_disperse_array() to de-disperse the data using the 'dispersion_measure' set in parameters.

Example

>>> from nenupy.io.tf import Spectra, TFTask
>>> import astropy.units as u

>>> sp_pulsar = Spectra(".../pulsar.spectra")
>>> sp_pulsar.pipeline.set_default()
>>> sp_pulsar.pipeline.insert(TFTask.de_disperse(), 1)
>>> sp_pulsar.pipeline.info()
>>> dedisperse_data = sp_pulsar.get(
        tmin="2023-10-02T08:40:40.8500000",
        tmax="2023-10-02T08:40:43.16000000",
        fmin=50,
        fmax=65,
        stokes=["I"],
        remove_channels=[0, 1, -1],
        rebin_df = 12207.03125 * 4 *u.Hz,
        dispersion_measure=2.972719 *u.pc / (u.cm**3)
    )
>>> dedisperse_data.plot(polarization="I", db=True)
../_images/tf_pulsar_dedispersion.png

Warning

Due to the configuration of the underlying Array, its dask.array.Array.compute() method has to be applied priori to de-dispersing the data. Therefore, a potential huge data volume may be computed at once. By default, a security exception is raised to prevent computing a too large data set. To bypass this limit, set 'ignore_volume_warning' of parameters to True.

classmethod flatten_subband()[source]

TFTask to flatten each sub-band bandpass. Based on the temporal median over each suband, a linear correction is applied to flatten the signal. This TFTask calls the flatten_subband() function.

Example

>>> from nenupy.io.tf import Spectra, TFTask, TFPipeline

>>> sp = Spectra("/my/file.spectra")
>>> sp.pipeline = TFPipeline(sp, TFTask.flatten_subband())
>>> data = sp.get(...)
../_images/tf_sb_flatten.png

Warning

This correction assumes that the signal’s spectrum could be considered flat at the sub-band resolution. The method is not recommended for data other than Stokes I.

classmethod frequency_rebin()[source]

TFTask to re-bin the data in frequency. The targetted frequency resolution is defined by the 'rebin_df' argument, set in parameters. The minimum value is the frequency resolution of the data, i.e. df. This TFTask calls the rebin_along_dimension() function.

Example

>>> from nenupy.io.tf import Spectra, TFTask, TFPipeline
>>> import astropy.units as u

>>> sp = Spectra("/my/file.spectra")
>>> sp.pipeline = TFPipeline(sp, TFTask.frequency_rebin())
>>> result = sp.get(
        rebin_df=50 * u.kHz
    )
../_images/tf_frequency_rebin.png
classmethod get_stokes()[source]

TFTask to compute the Stokes parameters by calling compute_stokes_parameters(). The computed Stokes parameters are defined by the 'stokes' argument, set in parameters.

Example

>>> from nenupy.io.tf import Spectra, TFTask, TFPipeline

>>> sp = Spectra("/my/file.spectra")
>>> sp.pipeline = TFPipeline(sp, TFTask.get_stokes())
>>> result = sp.get(stokes=["I", "U/I", "Q/I", "V/I"])

>>> result.plot(polarization="U/I") # to display
../_images/tf_stokes.png
property is_active

Attribute modified by update() which tells whether the configured parameters enable the embedded function usage.

Returns:

Whether the task will be run by the pipeline or not

Return type:

bool

classmethod mitigate_frequency_rfi(sigma_clip=2, polynomial_degree=5)[source]

TFTask to perform RFI mitigation by sigma clipping using mitigate_rfi_along_axis(). This task specifically catch outliers by comparison with the smoothed median spectral profile of the selected dataset. Several instances of this task may be chained together to clip iteratively the data. To obtain better results, this can be combined with mitigate_time_rfi().

Parameters:

Example

>>> from nenupy.io.tf import Spectra, TFTask, TFPipeline

>>> sp = Spectra("/my/file.spectra")
>>> sp.pipeline = TFPipeline(sp, TFTask.mitigate_frequency_rfi(sigma_clip=2, polynomial_degree=4))
>>> result = sp.get()
../_images/tf_rfimitigation_freq.png
classmethod mitigate_time_rfi(sigma_clip=2, polynomial_degree=5)[source]

TFTask to perform RFI mitigation by sigma clipping using mitigate_rfi_along_axis(). This task specifically catch outliers by comparison with the smoothed median time profile of the selected dataset. Several instances of this task may be chained together to clip iteratively the data. To obtain better results, this can be combined with mitigate_frequency_rfi().

Parameters:

Example

>>> from nenupy.io.tf import Spectra, TFTask, TFPipeline

>>> sp = Spectra("/my/file.spectra")
>>> sp.pipeline = TFPipeline(sp, TFTask.mitigate_time_rfi(sigma_clip=2, polynomial_degree=4))
>>> result = sp.get()
../_images/tf_rfimitigation_time.png
classmethod remove_channels()[source]

TFTask calling remove_channels_per_subband() to set a list of sub-band channels to NaN values.

Example

>>> from nenupy.io.tf import Spectra, TFTask, TFPipeline

>>> sp = Spectra("/my/file.spectra")
>>> sp.pipeline = TFPipeline(sp, TFTask.remove_channels())
>>> result = sp.get(
        remove_channels=[0, 2, 4, 10, -1]
    )
../_images/tf_remove_channels.png
classmethod time_rebin()[source]

TFTask to re-bin the data in time. The targetted time resolution is defined by the 'rebin_dt' argument, set in parameters. This TFTask calls the rebin_along_dimension() function.

Example

>>> from nenupy.io.tf import Spectra, TFTask, TFPipeline
>>> import astropy.units as u
>>> sp = Spectra("/my/file.spectra")
>>> sp.pipeline = TFPipeline(sp, TFTask.time_rebin())

Then, either perform a one_time application of the rebin_dt parameter (that is forgotten after the get() call):

>>> data = sp.get(..., rebin_dt=0.2*u.s,...)

Or, set it for further usage:

>>> sp.pipeline.parameters["rebin_dt"] = 0.2*u.s
>>> data = sp.get(...)
../_images/tf_time_rebin.png
update(parameters)[source]

Update the embedded function’s arguments to those listed in parameters that match args_to_update. Determine also if the TFTask is active or not, i.e., if the configured parameters enable it to be run.

Parameters:

parameters (TFPipelineParameters) – Pipeline parameters object, it must contain the parameter requested by args_to_update.

Raises:

KeyError – Raised if the parameters listed in args_to_update do not correspond with those listed in parameters.