UnDySPuTeD Time-Frequency Data
UnDySPuTeD (stands for Unified Dynamic Spectrum Pulsar and Time Domain receiver) is the receiver of the NenuFAR beamformer mode, fed by the (up-to-)96 core Mini-Arrays (2 polarizations) from the LANewBa backend.
The raw data flow from LANewBa consists of 195312.5 pairs
of complex X and Y values per second per beamlet. These data
are downsampled in channels per subband (of 195.3125 kHz)
numbered from 16 to 2048 channels, fftlen, (to achieve a
frequency resolution of \(\delta \nu = 12 - 0.1\, \rm{kHz}\)
respectively). After computation of cross and auto-correlations,
the data are downsampled again in time, integrating from 4 to 1024
spectra, nfft2int (implying a time resolution \(195312.5^{-1} \times \rm{fftlen} \times \rm{fft2int}\),
\(\delta t = 0.3 - 83.9\, \rm{ms}\)).
Each NenuFAR/UnDySPuTeD/DynSpec observation results in the
production of several proprietary formatted files ('*.spectra'),
each corresponding to an individual lane of the UnDySPuTeD receiver.
Depending on the observation configuration, the bandwidth and/or
the different observed beams (i.e., beamforming in different sky
directions) can be spread accross these files.
See also
Reading a spectra file
tf is the module designed to
read and analyze UnDySPuTeD DynSpec high-rate data. It
benefits from Dask, with
the possibility of reading and applying complex pipelines
to larger-than-memory data sets.
The class Spectra offers
the possibility to read and analyze these observation files:
>>> from nenupy.io import Spectra
>>> sp = Spectra(filename="my_file_0.spectra")
Note
By default, a check on missing data is applied.
This ensures that, later on, correct time information is displayed and data is properly flagged in the rare occasions where time blocks may be missing.
However, this procedure requires a data scan which, albeit shallow, takes time and scales with data size.
To avoid this time consuming check, the user may set the option check_missing_data=False.
We recommend to keep the check to True for any case other than quick data inspection.
>>> sp = Spectra(filename="my_file_0.spectra", check_missing_data=False)
Once a DynSpec file is ‘lazy’-read/loaded (i.e., without
being directly stored in memory), and before applying any processing,
it might be handy to check the data properties.
Basic information may be displayed by the info() method:
>>> sp.info()
filename: my_file_0.spectra
time_min: 2023-05-27T08:39:02.0000050
time_max: 2023-05-27T08:59:34.2445748
dt: 20.97152 ms
frequency_min: 19.921875 MHz
frequency_max: 57.421875 MHz
df: 3.0517578125 kHz
Available beam indices: ['0']
Pipeline configuration
Data access and processing are both achieved via a single method get().
The definition of such process is made by configuring the pipeline that will be
applied every time get() is called.
Predefined pipeline steps
Each Spectra object contains an associated
TFPipeline object (stored in its
pipeline attribute).
Depending on the pipeline configuration, some TFTask
may be successively called to process the data.
The TFPipeline consists of several steps, or tasks,
which can be displayed using info():
>>> sp.pipeline.info()
Pipeline configuration:
0 - Correct bandpass
(1 - Remove subband channels)
(2 - Rebin in time)
(3 - Rebin in frequency)
4 - Compute Stokes parameters
Note
Some tasks are displayed in parentheses, which means that even though they are included in the pipeline, the current configuration does not make them do anything to the data. For instance, no channels to be flagged are listed, or no time/frequency rebin values have been specified.
The default pipeline configuration (that can be reset to this state by calling set_default()) consists in the following TFTask:
Bandpass correction (
correct_bandpass())Remove channels at the subband edges (
remove_channels())Rebin the data in time (
time_rebin())Rebin the data in frequency (
frequency_rebin())Convert the data to Stokes parameters (
get_stokes())
See also
TFTask lists all the pre-defined tasks available and Special pipeline tasks for more explanations on more advanced tasks.
The pipeline tasks are using the parameters listed in the
parameters attribute (returning a
TFPipelineParameters object) as their configuration.
One can access the current state of these parameters by calling
info():
>>> print( sp.pipeline.parameters.info() )
channels: 64
dt: 0.02097152 s
df: 3051.7578125 Hz
tmin: 2023-05-27T08:39:02.0000050
tmax: 2023-05-27T08:59:34.2445748
fmin: 19.921875 MHz
fmax: 57.421875 MHz
beam: 0
dispersion_measure: None
rotation_measure: None
rebin_dt: None
rebin_df: None
remove_channels: None
dreambeam_skycoord: None
dreambeam_dt: None
dreambeam_parallactic: True
stokes: I
ignore_volume_warning: False
Pipeline parameter modification
The parameters may be modified like a dictionnary would.
See their list and description in the documentation of get().
Some checks are made to assert the correct formatting of the values and/or their
relevance regarding the loaded dataset.
For instance, the time and frequency range of the data selection to be applied
(the “step-0” of the pipeline), can be defined using astropy or not:
>>> from astropy.time import Time
>>> import astropy.units as u
>>> sp.pipeline.parameters["tmin"] = "2023-05-27T08:40:00"
>>> sp.pipeline.parameters["tmax"] = Time("2023-05-27 08:42:00", format="iso")
>>> sp.pipeline.parameters["fmin"] = 50
>>> sp.pipeline.parameters["fmax"] = 55*u.MHz
Note
The user may also update the pipeline parameters as arguments while calling
get(). This may be convenient for fast modification
and won’t affect future settings as they are forgotten after their usage
(contrary to parameters).
Managing pipeline tasks
Tasks may be removed from the pipeline using the remove()
method, taking as input the index of the task in the pipeline list:
>>> sp.pipeline.remove(2) # remove the (2 - Rebin in time) task
>>> sp.pipeline.info()
Pipeline configuration:
0 - Correct bandpass
(1 - Remove subband channels)
(2 - Rebin in frequency)
3 - Compute Stokes parameters
Alternatively, TFTask may be added using
insert() or append()
methods:
>>> from nenupy.io.tf import TFTask
>>> sp.pipeline.insert(TFTask.time_rebin(), 1)
>>> sp.pipeline.info()
Pipeline configuration:
0 - Correct bandpass
(1 - Rebin in time)
(2 - Remove subband channels)
(3 - Rebin in frequency)
4 - Compute Stokes parameters
Warning
The order in which TFTask are listed represents
their calling sequence in the pipeline. It is then crucial to assert that
a given task can ingest the data processed through the previous tasks.
For instance, it would make no sense to configure the channel removal task
after the rebinning in frequency. It would even result in an error since the
data shape would not match what the channel removal task is expecting.
Special pipeline tasks
The TFTask correct_polarization() is a powerful task that requires to be applied on \(X\) and \(Y\) components matrices (i.e., before converting the data to Stokes parameters for instance).
It aims at correcting the data for beam effects such as gain variation, projection and parallactic angle evolution accross source tracking.
To enable these corrections, the external module DreamBeam is required (it is not installed by default by nenupy as it relies on specialized softwares that are more platform dependent).
NenuFAR antenna beam pattern is implemented in DreamBeam and one can generate Jones matrices at various times and frequencies for a given observation (see also compute_jones_matrices() for a direct access to these Jones matrices).
The following example demonstrates a typical setting for this task, assuming DreamBeam is properly installed.
The task correct_polarization() is inserted in the pipeline just after the bandpass correction and before doing anything else.
The task parameters specify that Jones matrices will be generated and applied to the data every calib_dt for the celestial coordinates skycoord (here, at the mean position of the Sun during the time selection) and that the dreambeam_parallactic angle should be de-rotated:
>>> from nenupy.io.tf import TFTask
>>> from nenupy.astro.target import SolarSystemTarget
>>> import astropy.units as u
>>> sp.pipeline.set_default()
>>> sp.pipeline.insert(TFTask.correct_polarization(), 1)
>>> mean_time = selected_time_min + (selected_time_max - selected_time_min) / 2
>>> sun = SolarSystemTarget.from_name("Sun", mean_time).coordinates
>>> data_i_corr = sp.get(
tmin=selected_time_min,
tmax=selected_time_max,
fmin=selected_frequency_min,
fmax=selected_frequency_max,
stokes="I",
calib_dt=10 * u.s,
skycoord=sun,
dreambeam_parallactic=True
)
Adding custom steps
The TFTask class is flexible enough to allow the user
defining their own data processing steps.
This is a more advanced operation as it requires to dive a bit within the nenupy
code, but it is also very convenient to test new methods without (or rather before)
having to update the source code.
Here is a basic example:
>>> from nenupy.io.tf import TFTask
>>> custom_task = TFTask(
name="my task - multiply the data by n_channels",
func=lambda data, channels: data*channels,
args_to_update=["channels"]
)
>>> sp.pipeline.insert(custom_task, 3)
>>> sp.pipeline.info()
Pipeline configuration:
0 - Correct bandpass
(1 - Rebin in time)
(2 - Remove subband channels)
3 - my task - multiply the data by n_channels
(4 - Rebin in frequency)
5 - Compute Stokes parameters
Running the pipeline
After the tasks have been listed, the parameters have been set, the pipeline
can be run at once by calling get().
The minimal operation on this call is to select the data based on the time and
franquency range defined in parameters, as well
as the numerical beam index.
Although Dask allows for operations on
large datasets, it is wise to consider the output volume and/or computing
ressource that would be required for a given pipeline configuration.
>>> data = sp.get(stokes="I")
It is also possible to modify the pipeline parameters on the fly while calling
get(). Their values will however be forgotten once
after the method resolution:
>>> data = sp.get(stokes="I", tmin="2023-05-27T08:41:30")
Note
There is a hardcoded size limit to the data output (i.e. after rebinning and all other pipeline operations) fixed at 2 GB, to prevent memory issues.
Users willing to bypass this limit may explicitely ask for it using the ignore_volume_warning properties of pipeline().
This property can easily be updated directly by the get() method:
>>> sp.get(
tmin="2023-05-27T08:40:00", tmax="2023-05-27T18:00:00",
ignore_volume_warning=True
)
Saving the data
The result of the pipeline operation may also be saved in a HDF5 file if the
argument file_name is provided.
The saved data volume may be larger than the available memory.
>>> sp.get(
file_name="/my/path/filename.hdf5"
stokes="I",
tmin="2023-05-27T08:41:30"
)