Source code for nenupy.astro.uvw

#! /usr/bin/python3
# -*- coding: utf-8 -*-


"""
    ************
    UVW Coverage
    ************
"""


__author__ = "Alan Loh"
__copyright__ = "Copyright 2022, nenupy"
__credits__ = ["Alan Loh"]
__maintainer__ = "Alan"
__email__ = "alan.loh@obspm.fr"
__status__ = "Production"
__all__ = [
    "compute_uvw"
]


import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
from mpl_toolkits.axes_grid1.inset_locator import inset_axes

from astropy.coordinates import EarthLocation, SkyCoord, AltAz
from astropy.time import Time
from astropy.modeling import models, fitting
import astropy.units as u

from nenupy.instru.interferometer import Interferometer
from nenupy.astro import hour_angle, altaz_to_radec, wavelength
from nenupy import nenufar_position

import logging
log = logging.getLogger(__name__)


# ============================================================= #
# ------------------------ compute_uvw ------------------------ #
# ============================================================= #
[docs] def compute_uvw( interferometer: Interferometer, phase_center: SkyCoord = None, time: Time = Time.now(), observer: EarthLocation = nenufar_position ) -> u.Quantity: """ """ # Get the baselines in ITRF coordinates baselines_itrf = interferometer.baselines.bsl xyz = baselines_itrf[np.tril_indices(interferometer.size)].T #xyz = np.array(baselines_itrf).T log.info(f"Computing UVW (time steps: {time.size}, baselines: {xyz.shape[0]})...") # Select zenith phase center if nothing is provided if phase_center is None: log.debug("Default zenith phase center selected.") zenith = SkyCoord( np.zeros(time.size), np.ones(time.size)*90, unit="deg", frame=AltAz( obstime=time, location=observer ) ) phase_center = altaz_to_radec(zenith) center_dec_rad = phase_center.dec.rad if np.isscalar(center_dec_rad): center_dec_rad = np.repeat(center_dec_rad, time.size) # Compute the hour angle of the phase center lha = hour_angle( radec=phase_center, time=time, observer=observer, fast_compute=True ) lha_rad = lha.rad # Force the time to be an array if np.isscalar(lha_rad): lha_rad = np.array([lha_rad]) if time.isscalar: time = time.reshape((1,)) # celestial transformation # lat_rad = interferometer.position.lat.rad # cl = np.cos(lat_rad) # sl = np.sin(lat_rad) # transfo = np.array([ # [0, -sl, cl], # [1, 0, 0], # [0, cl, sl] # ]) # xyz = np.dot(np.moveaxis(transfo, -1, 0), xyz) # Compute UVW projection sr = np.sin(lha_rad) cr = np.cos(lha_rad) sd = np.sin(center_dec_rad) cd = np.cos(center_dec_rad) rot_uvw = np.array([ [ sr, cr, np.zeros(time.size)], [-sd*cr, sd*sr, cd], [ cd*cr, -cd*sr, sd] ]) # Project the baselines in the UVW frame uvw = - np.dot(np.moveaxis(rot_uvw, -1, 0), xyz) return np.moveaxis(uvw, -1, 1) * u.m
# ============================================================= # # ============================================================= # # ============================================================= # # ------------------------ compute_uvw ------------------------ # # ============================================================= # class UV_Coverage: """ """ def __init__(self, uvw: u.Quantity) -> None: # Getting the number of antennas used to compute the UVW n_baselines = uvw.shape[-2] # Solving n(n-1)/2 + n = n_baselines delta = 1 + 8*n_baselines # 2nd degree discriminant n_antennas = int( (-1 + np.sqrt(delta))/2 ) # only positive solution ant_1, ant_2 = np.tril_indices(n_antennas, 0) # Add the negative part (while not including auto-correlations twice) self.uvw = np.concatenate( (uvw, uvw[:, ant_1 != ant_2, :]*np.array([-1, -1, 1])), axis=1 ) # shape (time, bsl, 3) @property def uv_distance(self): """ """ return np.linalg.norm(self.uvw, axis=-1) @classmethod def compute(cls, interferometer: Interferometer, phase_center: SkyCoord = None, time: Time = Time.now(), observer: EarthLocation = nenufar_position ): """ """ uvw = compute_uvw( interferometer=interferometer, phase_center=phase_center, time=time, observer=observer ) return cls(uvw=uvw) def radial_profile(self, frequency: u.Quantity = None, plot: bool = True, **kwargs ): """ Plot the radial cut on the UV distribution kwargs step_width log figsize title """ if frequency is not None: wavel = wavelength(frequency) if wavel.isscalar: wavel = wavel.reshape((1,)) x_label = r"uv distance ($\lambda$)" uu = (self.uvw[..., 0][:, None, :] / wavel[None, :, None]).to(u.dimensionless_unscaled) vv = (self.uvw[..., 1][:, None, :] / wavel[None, :, None]).to(u.dimensionless_unscaled) else: x_label = "uv distance (m)" uu = self.uvw[..., 0].to(u.m).value vv = self.uvw[..., 1].to(u.m).value # Only work on the upper part of the plot since its symmetrical positive_v = vv > 0. uu = uu[positive_v] vv = vv[positive_v] uv_distance = np.sqrt(uu**2 + vv**2) average_distance = [] density = [] step_width = kwargs.get("step_width", 10) # lambda unit uv_distances_probed = np.arange( uv_distance.min(), uv_distance.max() + step_width, step_width ) for min_dist, max_dist in zip(uv_distances_probed[:-1], uv_distances_probed[1:]): mask = (uv_distance>=min_dist) & (uv_distance<=max_dist) avg_dist = np.mean([min_dist, max_dist]) average_distance.append( avg_dist ) density.append( uu[mask].size / avg_dist ) density = np.array(density)/np.max(density) # normalize average_distance = np.array(average_distance) # Fit # Ignore first points for the Gaussian fit start_index = np.argmax(density) # Perform a Gaussian fit gaussian_init = models.Gaussian1D( amplitude=1., mean=0, stddev=0.1 * max(average_distance),#0.68 * max(average_distance), bounds={'mean': (0., 0.)} ) fit_gaussian = fitting.LevMarLSQFitter() gaussian = fit_gaussian(gaussian_init, average_distance[start_index:], density[start_index:]) gstd = gaussian.stddev.value if plot: fig = plt.figure(figsize=kwargs.get("figsize", (10, 5))) ax = fig.add_subplot(1, 1, 1) ax.bar( average_distance, height=density, width=step_width, edgecolor="black", log=kwargs.get("log", False), linewidth=0.5 ) ax.set_xlabel(x_label) ax.set_ylabel("Density") ax.set_title(kwargs.get("title", "")) y_limits = ax.get_ylim() # Plot the fit x = np.linspace(min(average_distance), max(average_distance), 100) ax.plot( x, gaussian(x), linestyle=':', color='black', linewidth=2, label="Gaussian fit" ) plt.legend() ax.set_ylim(y_limits) # Save or show the figure figname = kwargs.get("figname", "") if figname != "": plt.savefig( figname, dpi=300, bbox_inches="tight", transparent=True ) log.info(f"Figure '{figname}' saved.") else: plt.show() plt.close("all") return average_distance, density, np.std((density, gaussian(average_distance)), axis=0) def azimuthal_profile(self, frequency: u.Quantity = None, plot: bool = True, **kwargs ): """ Plot the azimuthal cut ont the UV distribution """ if frequency is not None: wavel = wavelength(frequency) if wavel.isscalar: wavel = wavel.reshape((1,)) uu = (self.uvw[..., 0][:, None, :] / wavel[None, :, None]).to(u.dimensionless_unscaled) vv = (self.uvw[..., 1][:, None, :] / wavel[None, :, None]).to(u.dimensionless_unscaled) else: uu = self.uvw[..., 0].to(u.m).value vv = self.uvw[..., 1].to(u.m).value # Only work on the upper part of the plot since its symmetrical positive_v = vv > 0. uu = uu[positive_v] vv = vv[positive_v] uv_distance = np.sqrt(uu**2 + vv**2) uv_ang = np.degrees( np.arccos(uu/uv_distance) ) angle = [] density = [] step_angle = kwargs.get("step_angle", 5) angles_probed = np.arange( 0, 180 + step_angle, step_angle ) for min_ang, max_ang in zip(angles_probed[:-1], angles_probed[1:]): mask = (uv_ang>=min_ang) & (uv_ang<=max_ang) angle.append( np.mean([min_ang, max_ang]) ) density.append( uu[mask].size ) density = np.array(density)/np.max(density) mean_value = np.mean(density) angle = np.array(angle) if plot: fig = plt.figure(figsize=kwargs.get("figsize", (10, 5))) ax = fig.add_subplot(1, 1, 1) ax.bar( angle, height=density, width=step_angle, edgecolor="black", log=kwargs.get("log", False), linewidth=0.5 ) ax.set_xlabel("Azimuth (deg)") ax.set_ylabel("Density") ax.set_title(kwargs.get("title", "")) ax.axhline( mean_value, linestyle=":", color="black", linewidth=2, label="Median" ) plt.legend() # Save or show the figure figname = kwargs.get("figname", "") if figname != "": plt.savefig( figname, dpi=300, bbox_inches="tight", transparent=True ) log.info(f"Figure '{figname}' saved.") else: plt.show() plt.close("all") return angle, density, np.std((density, mean_value), axis=0) def psf(self, frequency: u.Quantity = None, grid_size: int = 1000, plot: bool = True, **kwargs ): """ """ if frequency is not None: wavel = wavelength(frequency) if wavel.isscalar: wavel = wavel.reshape((1,)) x_label = r"uv distance ($\lambda$)" uu = (self.uvw[..., 0][:, None, :] / wavel[None, :, None]).to(u.dimensionless_unscaled) vv = (self.uvw[..., 1][:, None, :] / wavel[None, :, None]).to(u.dimensionless_unscaled) else: x_label = "uv distance (m)" uu = self.uvw[..., 0].to(u.m).value vv = self.uvw[..., 1].to(u.m).value # Remove auto-correlations auto_corr = (uu == 0.) * (vv == 0.) # Grid the UV coverage uv_grid, x_edges, y_edges = np.histogram2d( uu[..., ~auto_corr].ravel(), vv[..., ~auto_corr].ravel(), bins=grid_size ) # Fourrier transform psf = np.fft.fft2(uv_grid) psf = np.fft.fftshift(psf) # Plot if plot: fig = plt.figure(figsize=kwargs.get("figsize", (10, 10))) ax = fig.add_subplot(1, 1, 1) abs_psf = np.abs(psf) im = ax.imshow( abs_psf, origin="lower", cmap=kwargs.get("cmap", "Blues"), interpolation="nearest", norm=LogNorm(vmin=abs_psf.min(), vmax=abs_psf.max()) if kwargs.get("log", False) else None ) # Colorbar cax = inset_axes( ax, width='3%', height='100%', loc='lower left', bbox_to_anchor=(1.05, 0., 1, 1), bbox_transform=ax.transAxes, borderpad=0, ) cb = fig.colorbar(im, cax=cax) cb.solids.set_edgecolor("face") cb.set_label(kwargs.get("colorbar_label", "")) return psf def plot(self, frequency: u.Quantity = None, **kwargs) -> None: """ kwargs figsize gridsize cmap overlay_scatter colorbar_label title figname """ # Either plot raw (u, v) in meters, or, if the frequency is specified, # plots (u, v) in lambda units. if frequency is not None: wavel = wavelength(frequency) if wavel.isscalar: wavel = wavel.reshape((1,)) x_label = r"u ($\lambda$)" y_label = r"v ($\lambda$)" uu = self.uvw[..., 0][:, None, :] / wavel[None, :, None] vv = self.uvw[..., 1][:, None, :] / wavel[None, :, None] else: x_label = "u (m)" y_label = "v (m)" uu = self.uvw[..., 0] vv = self.uvw[..., 1] # Scale the haxagon grid, with respect to the data, in order to not # get deformed hexagons (since the plot is scaled equally in x and y). uv_min = min((uu.min().value, vv.min().value)) uv_max = max((uu.max().value, vv.max().value)) uv_width = np.abs(uv_min - uv_max) ax_min = uv_min - 0.05 * uv_width ax_max = uv_max + 0.05 * uv_width # Plot the hexagon bins fig = plt.figure(figsize=kwargs.get("figsize", (10, 10))) ax = fig.add_subplot(1, 1, 1) hexb = ax.hexbin( uu.value, vv.value, gridsize=kwargs.get("gridsize", 70), bins=None, xscale="linear", yscale="linear", extent=(ax_min, ax_max, ax_min, ax_max), cmap=kwargs.get("cmap", "Blues"), edgecolors="face", mincnt=1, norm=LogNorm()#vmin=Z.min(), vmax=Z.max()), ) if kwargs.get("overlay_scatter", False): ax.scatter(uu.value, vv.value, 2, alpha=0.5) # Colorbar cax = inset_axes( ax, width='3%', height='100%', loc='lower left', bbox_to_anchor=(1.05, 0., 1, 1), bbox_transform=ax.transAxes, borderpad=0, ) cb = fig.colorbar(hexb, cax=cax) cb.solids.set_edgecolor("face") cb.set_label(kwargs.get("colorbar_label", "Density")) # ax.set_facecolor("0.8") ax.set_xlabel(x_label) ax.set_ylabel(y_label) ax.set_aspect('equal', adjustable='datalim') ax.set_title(kwargs.get("title", "")) # Save or show the figure figname = kwargs.get("figname", "") if figname != "": plt.savefig( figname, dpi=300, bbox_inches="tight", transparent=True ) log.info(f"Figure '{figname}' saved.") else: plt.show() plt.close("all") @staticmethod def _prepare_plot(uvw: u.Quantity, frequency: u.Quantity) -> u.Quantity: """ """ return # ============================================================= # # ============================================================= #