#! /usr/bin/python3
# -*- coding: utf-8 -*-


import numpy as np
import copy
from typing import Union
import logging
log = logging.getLogger(__name__)

from astropy.coordinates import SkyCoord, EarthLocation, ICRS, AltAz
from astropy.time import Time
import astropy.units as u
from astropy.visualization.wcsaxes.frame import EllipticalFrame
from astropy.wcs import WCS
from reproject import reproject_from_healpix
import dask.array as da
from dask.diagnostics import ProgressBar

import matplotlib.pyplot as plt
from matplotlib import patheffects
from matplotlib.colorbar import ColorbarBase
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
import warnings
from import get_cmap
from matplotlib.ticker import LinearLocator
from matplotlib.colors import Normalize

    import healpy.pixelfunc as hpx
except ImportError:
    log.warning("Unable to load 'healpy', some functionalities may not be working.")
    hpx = None

from nenupy import nenufar_position, DummyCtMgr
from nenupy.astro.astro_tools import AstroObject

# ============================================================= #
# ----------------------- SkySliceBase ------------------------ #
# ============================================================= #
[docs] class SkySliceBase(AstroObject): """ """ def __init__(self, coordinates: SkyCoord, frequency: u.Quantity, time: Time, polarization: Union[str, float, int], value: np.ndarray, observer: EarthLocation = nenufar_position ): self.coordinates = coordinates self.time = time self.frequency = frequency self.polarization = polarization = observer self.value = value @property def visible_sky(self): """ """ altaz = self.horizontal_coordinates return altaz.alt.deg > 0 # --------------------------------------------------------- # # ------------------------ Methods ------------------------ #
[docs] def plot(self, **kwargs): r""" Display the selected content of the :attr:`` attribute belonging to a :class:`` instance as a celestial map in equatorial coordinates. This method is available on a :class:`` instance, resulting from a selection upon a :class:`` instance (using the indexing operator). Several parameters, listed below, can be tuned to adapt the plot to the user requirements: .. rubric:: Data display keywords :param center: Coordinates of the celestial map to be displayed at the center of the image. Default is ``(RA=0deg, Dec=0deg)``. :type center: :class:`~astropy.coordinates.SkyCoord` :param radius: Angular radius from the center of the image above which the plot should be cropped. Default is ``None`` (i.e., full sky image). :type radius: :class:`~astropy.units.Quantity` :param resolution: Set the pixel resolution. The upper threshold is 0.775 deg, any value above that does not affect the figure appearence. Default is ``astropy.units.Quantity(1, unit="deg")``. :type resolution: :class:`~astropy.units.Quantity` :param only_visible: If set to ``True`` only the sky above the horizon is displayed. Setting this parameter to ``False`` does not really make sense for :class:`` instances representing antenna response for instance. Default is ``True``. :type only_visible: `bool` :param decibel: If set to ``True``, the data values are displayed at the decibel scale, i.e., :math:`10 \log( \rm{data} )`. Default is ``False``. :type decibel: `bool` :param interpolation: Matplotlib `imshow interpolation <>`_ method. Default is ``'quadric'``. :type interpolation: `str` .. rubric:: Overplot keywords :param scatter: Add a scatter plot (as defined in `matplotlib.pyplot.scatter`). Expected syntax is ``(<SkyCoord>, <marker_size>, <color>)``. Default is ``None`` (i.e., no scatter overplot). :type scatter: `tuple` :param text: Add a text overlay (as defined in `matplotlib.pyplot.text`). Expected syntax is ``(<SkyCoord>, <[text]>, <color>)``. Default is ``None`` (i.e., no text overplot). :type text: `tuple` :param contour: Add a contour plot (as defined in `matplotlib.pyplot.contour`). Expected syntax is ``(<numpy.ndarray>, <[levels]>, <colormap>)``. Default is ``None`` (i.e., no contour overplot). :type contour: `tuple` .. rubric:: Plotting layout keywords :param altaz_overlay: If set to ``True``, the horizontal coordinates grid is overplotted in addition to the equatorial one. Default is ``False``. :type altaz_overlay: `bool` :param cmap: Color map applied while representing the data (see `Matplotlib colormaps <>`_). Default is ``"YlGnBu_r"``. :type cmap: `str` :param show_colorbar: Show or not the color bar. Default is ``True``. :type show_colorbar: `bool` :param colorbar_label: Set the label of the color bar. Default is ``""``. :type colorbar_label: `str` :param figname: Name of the file (absolute or relative path) to save the figure. If set to ``"return"``, the method returns the `tuple` ``(fig, ax)`` (as defined by `matplotlib <>`_). Default is ``None`` (i.e., only show the figure). :type figname: `str` :param figsize: Set the figure size. Default is ``(15, 10)``. :type figsize: `tuple` :param ticks_color: Set the color of the equatorial grid and the Right Ascension ticks. Default is ``"0.9"`` (grey). :type ticks_color: `str` :param title: Set the figure title. Default is ``"<time>, <frequency>"``. :type title: `str` """ # Parsing the keyword arguments resolution = kwargs.get("resolution", 1*u.deg) figname = kwargs.get("figname", None) cmap = kwargs.get("cmap", "YlGnBu_r") figsize = kwargs.get("figsize", (15, 10)) center = kwargs.get("center", SkyCoord(0*u.deg, 0*u.deg)) radius = kwargs.get("radius", None) ticks_color = kwargs.get("ticks_color", "0.9") colorbar_label = kwargs.get("colorbar_label", "") title = kwargs.get("title", f"{self.time.isot.split('.')[0]}, {self.frequency:.2f}") visible_sky = kwargs.get("only_visible", True) decibel = kwargs.get("decibel", False) altaz_overlay = kwargs.get("altaz_overlay", False) alpha = kwargs.get('alpha', None) # Initialize figure wcs, shape = self._compute_wcs( center=center, resolution=getattr(self, "resolution", resolution), radius=radius ) fig = plt.figure(figsize=figsize) ax = plt.subplot( projection=wcs, frame_class=EllipticalFrame ) # Get the data projected on fullsky data = self._fullsky_projection( wcs=wcs, shape=shape, display_visible_sky=visible_sky ) if alpha is not None: # Use additional data stored in alpha argument to overlap an oppacity mask alpha_image = self._fullsky_projection( wcs=wcs, shape=shape, display_visible_sky=False, value=alpha ) alpha_image[np.isnan(alpha_image)] = 0 alpha_image /= alpha_image.max() # Scale the data in decibel if decibel: data = 10 * np.log10(data) vmin = kwargs.get("vmin", np.nanmin(data)) vmax = kwargs.get("vmax", np.nanmax(data)) # Plot the data im = ax.imshow( data, origin="lower", interpolation=kwargs.get("interpolation", "quadric"), cmap=cmap, vmin=vmin, vmax=vmax, alpha=None if alpha is None else alpha_image ) # Define ax ticks ax.coords.grid(color=ticks_color, alpha=0.5) path_effects=[patheffects.withStroke(linewidth=3, foreground='black')] ra_axis = ax.coords[0] dec_axis = ax.coords[1] ra_axis.set_ticks_visible(False) ra_axis.set_ticklabel_visible(True) ra_axis.set_ticklabel(color=ticks_color, exclude_overlapping=True, path_effects=path_effects) ra_axis.set_axislabel("RA", color=ticks_color, path_effects=path_effects) ra_axis.set_major_formatter("d") ra_axis.set_ticks(number=12) dec_axis.set_ticks_visible(False) dec_axis.set_ticklabel_visible(True) dec_axis.set_axislabel("Dec", minpad=2) dec_axis.set_major_formatter("d") dec_axis.set_ticks(number=10) if altaz_overlay: frame = AltAz(obstime=self.time, overlay = ax.get_coords_overlay(frame) overlay.grid(color="tab:orange", alpha=0.5) az_axis = overlay[0] alt_axis = overlay[1] az_axis.set_axislabel("Azimuth", color=ticks_color, path_effects=path_effects) az_axis.set_ticks_visible(False) az_axis.set_ticklabel_visible(True) az_axis.set_ticklabel(color=ticks_color, path_effects=path_effects) az_axis.set_major_formatter("d") az_axis.set_ticks(number=12) alt_axis.set_axislabel("Elevation") alt_axis.set_ticks_visible(False) alt_axis.set_ticklabel_visible(True) alt_axis.set_major_formatter("d") alt_axis.set_ticks(number=10) # Add NSEW points nesw_labels = np.array(["N", "E", "S", "W"]) nesw = SkyCoord( np.array([0, 90, 180, 270]), np.array([0, 0, 0, 0]), unit="deg", frame=frame ).transform_to(ICRS) for label, coord in zip(nesw_labels, nesw): ax.text( x=coord.ra.deg, y=coord.dec.deg, s=label, color="tab:orange", transform=ax.get_transform("world"), path_effects=path_effects, verticalalignment="center", horizontalalignment="center", clip_on=True ) # Colorbar if kwargs.get("show_colorbar", True): cax = inset_axes( ax, width='3%', height='100%', loc='lower left', bbox_to_anchor=(1.05, 0., 1, 1), bbox_transform=ax.transAxes, borderpad=0, ) with warnings.catch_warnings(): # Catche Deprecation warning of get_map (ginore because we are mpl<3.7) warnings.simplefilter("ignore") cb = ColorbarBase( cax, cmap=get_cmap(name=cmap), orientation='vertical', norm=Normalize( vmin=vmin, vmax=vmax ), ticks=LinearLocator() ) cb.solids.set_edgecolor("face") cb.set_label(colorbar_label) cb.formatter.set_powerlimits((0, 0)) # Overplot # if kwargs.get("circle", None) is not None: # from matplotlib.patches import Circle # frame = AltAz(obstime=self.time, # c = Circle( # (0, 75), # 20, # edgecolor='yellow', # linewidth=5, # facecolor='none', # #transform=ax.get_transform('world') # #transform=ax.get_transform('fk5') # transform=ax.get_transform(frame) # ) # ax.add_patch(c) if kwargs.get("moc", None) is not None: # In order fo that to work; I had to comment #axis_viewport.set(ax, wcs) # from add_patches_to_mpl_axe() in mocpy/moc/plot/ # OR re-set the limits (done here) try: frame = AltAz(obstime=self.time, xlimits = ax.get_xlim() ylimits = ax.get_ylim() mocs = kwargs["moc"] if isinstance(kwargs["moc"], list) else [kwargs["moc"]] for moc, color in zip(mocs, ["tab:red", "tab:green"]): moc.fill( ax=ax, wcs=wcs, alpha=0.5, fill=True, color=color, linewidth=0, ) ax.set_xlim(xlimits) ax.set_ylim(ylimits) except AttributeError: log.warning("A 'MOC' object, generated from mocpy is expected.") raise if kwargs.get("altaz_moc", None) is not None: xlimits = ax.get_xlim() ylimits = ax.get_ylim() altaz = self.horizontal_coordinates mask = kwargs["altaz_moc"].contains(, altaz.alt) ax.scatter( x=self.coordinates[mask].ra.deg, y=self.coordinates[mask].dec.deg, s=0.1,#[marker_size]*coords.size, facecolor="red", edgecolor=None, alpha=0.5, transform=ax.get_transform("world") ) ax.set_xlim(xlimits) ax.set_ylim(ylimits) if kwargs.get("scatter", None) is not None: parameters = kwargs["scatter"] if len(parameters) != 3: raise ValueError( "'scatter' syntax should be: (<SkyCoord>, <size>, <color>)" ) coords = parameters[0] if coords.isscalar: coords = coords.reshape((1,)) marker_size = parameters[1] marker_color = parameters[2] ax.scatter( x=coords.ra.deg, y=coords.dec.deg, s=[marker_size]*coords.size, color=marker_color, transform=ax.get_transform("world") ) if kwargs.get("text", None) is not None: parameters = kwargs["text"] if len(parameters) != 3: raise ValueError( "'text' syntax should be: (<SkyCoord>, <[text]>, <color>)" ) coords = parameters[0] if coords.isscalar: coords = coords.reshape((1,)) text = parameters[1] text_color = parameters[2] for i in range(coords.size): ax.text( x=coords[i].ra.deg, y=coords[i].dec.deg, s=text[i], color=text_color, transform=ax.get_transform("world"), clip_on=True ) if kwargs.get("contour", None) is not None: parameters = kwargs["contour"] data = parameters[0] if len(parameters) != 3: raise ValueError( "'contour' syntax should be: (<numpy.ndarray>, <[levels]>, <colormap>)" ) contour, _ = reproject_from_healpix( (data, ICRS()), wcs, nested=False, shape_out=shape#(ndec, nra) ) cont = ax.contour( contour, levels=parameters[1], cmap=parameters[2], ) ax.clabel(cont, inline=1, fontsize=10) # Other im.set_clip_path(ax.coords.frame.patch) ax.set_title(title, pad=20) # Save or show if figname is None: elif figname.lower() == "return": return fig, ax else: fig.savefig( figname, dpi=300, transparent=True, bbox_inches='tight' ) plt.close('all')
# ============================================================= # # ============================================================= # # ============================================================= # # ------------------------- SkySlice -------------------------- # # ============================================================= #
[docs] class SkySlice(SkySliceBase): """ """ def __init__(self, coordinates: SkyCoord, frequency: u.Quantity, time: Time, polarization: Union[str, float, int], value: np.ndarray, observer: EarthLocation = nenufar_position ): super().__init__( coordinates=coordinates, time=time, frequency=frequency, polarization=polarization, observer=observer, value=value ) # --------------------------------------------------------- # # ----------------------- Internal ------------------------ # @staticmethod def _compute_wcs(center: SkyCoord, resolution: u.Quantity, radius: u.Quantity): """ """ dangle = 0.675 scale = int(dangle/ #scale = int( scale = 1 if scale <= 1 else scale ra_dim = 480*scale dec_dim = 240*scale if radius is not None: resol = dangle/scale ra_dim = int(2 * / resol) dec_dim = ra_dim #raauto = False wcs = WCS(naxis=2) wcs.wcs.crpix = [ra_dim/2 + 0.5, dec_dim/2 + 0.5] wcs.wcs.cdelt = np.array([-dangle/scale, dangle/scale]) wcs.wcs.crval = [center.ra.deg, center.dec.deg] wcs.wcs.ctype = ['RA---AIT', 'DEC--AIT'] return wcs, (ra_dim, dec_dim) def _fullsky_projection(self, wcs: WCS, shape: tuple, display_visible_sky: bool, value: np.ndarray = None): """ """ x, y = wcs.world_to_pixel(self.coordinates) data = np.zeros(shape, dtype=np.float64) data[:, :] = np.nan weights = np.zeros(shape, dtype=int) x_int = np.floor(x).astype(int) x_in_image = (x_int >= 0) & (x_int < shape[0]) y_int = np.floor(y).astype(int) y_in_image = (y_int >= 0) & (y_int < shape[1]) in_image_mask = x_in_image & y_in_image x_int = x_int[in_image_mask] y_int = y_int[in_image_mask] if value is None: values = copy.deepcopy(self.value) else: values = da.from_array(value) if display_visible_sky: values[~self.visible_sky] = np.nan values = values[in_image_mask] if isinstance(values, da.Array): with ProgressBar() if log.getEffectiveLevel() <= logging.INFO else DummyCtMgr(): values = values.compute() data[(x_int, y_int)] = 0., (x_int, y_int), 1) weights[weights<0.5] = 1., (x_int, y_int), values) data[(x_int, y_int)] /= weights[(x_int, y_int)] return data.T
# ============================================================= # # ============================================================= # # ============================================================= # # ------------------------ HpxSkySlice ------------------------ # # ============================================================= #
[docs] class HpxSkySlice(SkySliceBase): """ """ def __init__(self, coordinates: SkyCoord, frequency: u.Quantity, time: Time, polarization: Union[str, float, int], value: np.ndarray, observer: EarthLocation = nenufar_position ): super().__init__( coordinates=coordinates, time=time, frequency=frequency, polarization=polarization, observer=observer, value=value ) # --------------------------------------------------------- # # ----------------------- Internal ------------------------ # @staticmethod def _compute_wcs(center: SkyCoord, resolution: u.Quantity, radius: u.Quantity = None): """ """ dangle = 0.675 scale = int(dangle/ scale = 1 if scale <= 1 else scale ra_dim = 480*scale dec_dim = 240*scale if radius is not None: resol = dangle/scale ra_dim = int(2 * / resol) dec_dim = ra_dim #raauto = False wcs = WCS(naxis=2) wcs.wcs.crpix = [ra_dim/2 + 0.5, dec_dim/2 + 0.5] wcs.wcs.cdelt = np.array([-dangle/scale, dangle/scale]) wcs.wcs.crval = [center.ra.deg, center.dec.deg] wcs.wcs.ctype = ['RA---AIT', 'DEC--AIT'] return wcs, (dec_dim, ra_dim) def _fullsky_projection(self, wcs: WCS, shape: tuple, display_visible_sky: bool, value: np.ndarray = None): """ """ if value is None: values = copy.deepcopy(self.value) else: values = da.from_array(value) if display_visible_sky: values[~self.visible_sky] = np.nan if isinstance(values, da.Array): with ProgressBar() if log.getEffectiveLevel() <= logging.INFO else DummyCtMgr(): values = values.compute() with np.errstate(invalid='ignore'): # Ignore the invalid value in bilinear_interpolation (astropy-healpix) array, _ = reproject_from_healpix( (values, ICRS()), wcs, nested=False, shape_out=shape ) return array
# ============================================================= # # ============================================================= # # ============================================================= # # ---------------------------- Sky ---------------------------- # # ============================================================= #
[docs] class Sky(AstroObject): """ """ def __init__(self, coordinates: SkyCoord, time: Time =, frequency: u.Quantity = 50*u.MHz, polarization: np.ndarray = np.array([0]), value: Union[float, np.ndarray] = 0., observer: EarthLocation = nenufar_position ): self.coordinates = coordinates self.time = time self.frequency = frequency self.polarization = polarization = observer self.value = value def __str__(self): text = ( f"{self.__class__} instance\n" f"value: {self.shape}\n" f"\t* time: {self.time.shape}\n" f"\t* frequency: {self.frequency.shape}\n" f"\t* polarization: {self.polarization.shape}\n" f"\t* coordinates: {self.coordinates.shape}\n" ) return text def __truediv__(self, other): if isinstance(other, Sky): self.value /= other.value else: self.value /= other return self def __mul__(self, other): new_sky = copy.copy(self) if isinstance(other, Sky): new_sky.value *= other.value else: new_sky.value *= other return new_sky def __getitem__(self, n): """ """ val = self.value[n] if val.ndim != 1: raise IndexError( "<class 'HpxSky'>: wrong index selection on <arg " f"'value'> of shape {self.value.shape} (time, " "frequency, healpix_cells). A 1D array is " "expected as a result of the selection." ) return SkySlice( coordinates=self.coordinates, value=val, time=self.time[n[0]], frequency=self.frequency[n[1]], polarization=self.polarization[n[2]], ) # --------------------------------------------------------- # # --------------------- Getter/Setter --------------------- # @property def value(self): """ """ return self._value @value.setter def value(self, v): expected_shape = ( self.time.size, self.frequency.size, self.polarization.size, self.coordinates.size ) if np.isscalar(v): v *= np.ones(expected_shape) else: if v.shape != expected_shape: raise ValueError(f"Shape incorrect, expected {expected_shape}, got {v.shape}.") if v.dtype < np.float64: v = v.astype(np.float64) self._value = v @property def time(self): """ """ return self._time @time.setter def time(self, t): if t.isscalar: t = t.reshape((1,)) self._time = t @property def frequency(self): """ """ return self._frequency @frequency.setter def frequency(self, f): if f.isscalar: f = f.reshape((1,)) self._frequency = f @property def polarization(self): """ """ return self._polarization @polarization.setter def polarization(self, p): if np.ndim(p) == 0: p = np.array([p]) self._polarization = p @property def shape(self): """ """ return self.value.shape # @property # def visible_sky(self): # """ """ # altaz = self.horizontal_coordinates # return altaz.alt.deg > 0 @property def visible_mask(self): """ """ mask = self.horizontal_coordinates.alt.deg > 0 mask = np.expand_dims(mask, (1, 2)) # add frequency and polarization mask = np.repeat(mask, self.frequency.size, axis=1) mask = np.repeat(mask, self.polarization.size, axis=2) return mask # --------------------------------------------------------- # # ------------------------ Methods ------------------------ #
[docs] def compute_lmn(self, phase_center: SkyCoord, coordinate_mask: np.ndarray = None): r""" (l, m, n) image domain coordinates computed from HEALPix equatorial coordinates (in Right-Ascension :math:`\alpha` and Declination :math:`\delta`, see :attr:``) with respect to the ``phase_center`` (of equatorial coordinates :math:`\alpha_0`, :math:`\delta_0`). .. math:: \cases{ l = \cos(\delta) \sin( \Delta \alpha)\\ m = \sin(\delta) \cos(\delta_0) - \cos(\delta) \sin(\delta_0) \cos(\Delta \alpha)\\ n = \sqrt{ 1 - l^2 - m^2 } } where :math:`\Delta \alpha = \alpha - \alpha_0`. :param phase_center: Image phase center. :type phase_center: :class:`~astropy.coordinates.SkyCoord` :param coordinate_mask: Mask applied to coordinates before computing (l,m,n) values. :type coordinate_mask: :class:`~numpy.ndarray` :returns: (l, m, n) :rtype: `tuple` of 3 :class:`~numpy.ndarray` """ ra = self.coordinates[coordinate_mask].ra.rad dec = self.coordinates[coordinate_mask].dec.rad ra_0 = phase_center.ra.rad dec_0 = phase_center.dec.rad ra_delta = ra - ra_0 # ra_delta = ra_0 - ra l = np.cos(dec)*np.sin(ra_delta) m = np.sin(dec)*np.cos(dec_0) -\ np.cos(dec)*np.sin(dec_0)*np.cos(ra_delta) n = np.sqrt(1 - l**2 - m**2) return l, m, n
# --------------------------------------------------------- # # ----------------------- Internal ------------------------ # # ============================================================= # # ============================================================= # # ============================================================= # # -------------------------- HpxSky --------------------------- # # ============================================================= #
[docs] class HpxSky(Sky): """ """ def __init__(self, resolution: u.Quantity = 1*u.deg, time: Time =, frequency: u.Quantity = 50*u.MHz, polarization: np.ndarray = np.array([0]), value: Union[float, np.ndarray] = 0., observer: EarthLocation = nenufar_position ): if hpx is None: log.error( f"Unable to create an instance of {self.__qualname__} since 'healpy' does not work." ) self.nside, self.resolution = self._resol2nside(resolution=resolution) # Construct the Healpix coordinates map ra, dec = hpx.pix2ang( nside=self.nside, ipix=np.arange( hpx.nside2npix(self.nside), dtype=np.int64 ), lonlat=True, nest=False ) super().__init__( coordinates=SkyCoord(ra, dec, unit="deg"), time=time, frequency=frequency, polarization=polarization, value=value, observer=observer ) def __getitem__(self, n): """ """ val = self.value[n] if val.ndim != 1: raise IndexError( "<class 'HpxSky'>: wrong index selection on <arg " f"'value'> of shape {self.value.shape} (time, " "frequency, healpix_cells). A 1D array is " "expected as a result of the selection." ) return HpxSkySlice( coordinates=self.coordinates, value=val, time=self.time[n[0]], frequency=self.frequency[n[1]], polarization=self.polarization[n[2]], ) # --------------------------------------------------------- # # ------------------------ Methods ------------------------ #
[docs] @classmethod def shaped_like(cls, other): """ """ if not isinstance(other, HpxSky): raise TypeError( f"{cls.__class__} instance expected." ) return cls( resolution=other.resolution, time=other.time, frequency=other.frequency, polarization=other.polarization, )
# --------------------------------------------------------- # # ----------------------- Internal ------------------------ # @staticmethod def _resol2nside(resolution: u.Quantity): """ Returns the HEALPix nside and effective resolution. """ # Get all nsides for all HEALPix oders healpix_nsides = hpx.order2nside(np.arange(30)) # Convert them into angular resolutions available_resolutions = hpx.nside2resol( healpix_nsides, arcmin=True )*u.arcmin # Find the index of the closest matching HEALPix resolution order_index = np.argmin( np.abs(available_resolutions - resolution) ) # Retrieve the corresponding nside and reoslution nside = healpix_nsides[order_index] effective_resolution = available_resolutions[order_index] return nside, effective_resolution
# ============================================================= # # ============================================================= #