Source code for plotastrodata.analysis_utils

import numpy as np
import warnings
from dataclasses import dataclass
from scipy.interpolate import RegularGridInterpolator as RGI
from scipy.signal import convolve
from typing import Any, Callable

from plotastrodata import const_utils as cu
from plotastrodata.coord_utils import coord2xy, rel2abs, xy2coord
from plotastrodata.fits_utils import data2fits, FitsData, Jy2K
from plotastrodata.fitting_utils import (EmceeCorner, gaussfit1d,
                                         gaussfit2d, gaussian2d)
from plotastrodata.matrix_utils import dot2d, Mfac, Mrot
from plotastrodata.noise_utils import estimate_rms
from plotastrodata.other_utils import (isdeg, nearest_index,
                                       RGIxy, RGIxyv, to4dim, trim)


[docs] def quadrantmean(data: np.ndarray, x: np.ndarray, y: np.ndarray, quadrants: str = '13' ) -> tuple[np.ndarray, np.ndarray, np.ndarray] | None: """Take mean between 1st and 3rd (or 2nd and 4th) quadrants. Args: data (np.ndarray): 2D array. x (np.ndarray): 1D array. First coordinate. y (np.ndarray): 1D array. Second coordinate. quadrants (str, optional): '13' or '24'. Defaults to '13'. Returns: tuple: Averaged (data, x, y). """ if np.ndim(data) != 2: print('data must be 2D.') return if quadrants not in ['13', '24']: print("quadrants must be '13' or '24'.") return dx = x[1] - x[0] dy = y[1] - y[0] nx = int(np.ceil(np.max(np.abs(x)) / dx)) ny = int(np.ceil(np.max(np.abs(y)) / dy)) xnew = np.linspace(-nx, nx, 2 * nx + 1) * dx ynew = np.linspace(-ny, ny, 2 * ny + 1) * dy s = 1 if quadrants == '13' else -1 f = RGI((y, s * x), data, bounds_error=False, fill_value=np.nan) datanew = f(np.meshgrid(ynew, xnew, indexing='ij')) datanew = (datanew + datanew[::-1, ::-1]) / 2. return datanew[ny:, nx:], xnew[nx:], ynew[ny:]
[docs] def filled2d(data: np.ndarray, x: np.ndarray, y: np.ndarray, n: int = 1, **kwargs: Any) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Fill 2D data, 1D x, and 1D y by a factor of n using RGI. Args: data (np.ndarray): 2D or 3D array. x (np.ndarray): 1D array. y (np.ndarray): 1D array. n (int, optional): How many times more the new grid is. Defaults to 1. Returns: tuple: The interpolated (data, x, y). """ xnew = np.linspace(x[0], x[-1], n * (len(x) - 1) + 1) ynew = np.linspace(y[0], y[-1], n * (len(y) - 1) + 1) d = RGIxy(y, x, data, np.meshgrid(ynew, xnew, indexing='ij'), **kwargs) return d, xnew, ynew
def _need_multipixels(method: Callable) -> Callable: def wrapper(cls: Any, *args: Any, **kwargs: Any) -> Any | None: singlepixel = cls.dx is None or cls.dy is None if singlepixel: print('No pixel size.') return return method(cls, *args, **kwargs) return wrapper
[docs] @dataclass class AstroData(): """Data to be processed and parameters for processing the data. Args: data (np.ndarray, optional): 2D or 3D array. Defaults to None. x (np.ndarray, optional): 1D array. Defaults to None. y (np.ndarray, optional): 1D array. Defaults to None. v (np.ndarray, optional): 1D array. Defaults to None. beam (np.ndarray, optional): [bmaj, bmin, bpa]. Defaults to [None, None, None]. fitsimage (str, optional): Input fits name. Defaults to None. Tb (bool, optional): True means the data array is brightness temperature. Defaults to False. sigma (float or str, optional): Noise level or method for measuring it. Defaults to 'hist'. center (str, optional): Text coordinates. 'common' means initialized value. Defaults to 'common'. restfreq (float, optional): Used for velocity and brightness temperature. Defaults to None. cfactor (float, optional): The data array is multiplied by cfactor. Defaults to 1. pvpa (float, optional): Position angle of the PV cut. Defaults to None. pv (bool, optional): True means the data array is a position-velocity diagram. Defaults to False. bunit (str, optional): The unit of the data array. Defaults to ''. """ data: np.ndarray | None = None x: np.ndarray | None = None y: np.ndarray | None = None v: np.ndarray | None = None beam: np.ndarray | tuple[None] = (None, None, None) fitsimage: str | None = None Tb: bool = False sigma: str | float | None = 'hist' center: str = 'common' restfreq: float | None = None cfactor: float = 1 pvpa: float | None = None pv: bool = False bunit: str = '' def __post_init__(self) -> None: n = 0 if self.fitsimage is not None: if not isinstance(self.fitsimage, list): n = 1 elif any(a is not None for a in self.fitsimage): n = len(self.fitsimage) else: n = 0 if n > 0: self.data = None if self.data is not None: if not isinstance(self.data, list): n = 1 elif any(a is not None for a in self.data): n = len(self.data) else: n = 0 if n == 0: print('Either data or fitsimage must be given.') self.n = n self.fitsimage_org = None self.sigma_org = None self.beam_org = None self.fitsheader = None def _binning_one(self, t: str, width: float) -> None: grid = getattr(self, t) if width == 1 or grid is None: return dt = f'd{t}' sep = getattr(self, dt) if sep is None: s = f'Skip binning in the {t}-axis because {dt} is None.' warnings.warn(s, UserWarning) return i = {'v': 1, 'y': 2, 'x': 3}[t] sizenew = self.size[i] // width self.size[i] = sizenew data = np.moveaxis(self.data, i, 0) datanew = np.moveaxis(np.zeros(self.size), i, 0) gridnew = np.zeros(sizenew) for start in range(width): stop = start + sizenew * width datanew += data[start:stop:width] gridnew += grid[start:stop:width] self.data = np.moveaxis(datanew, 0, i) / width setattr(self, t, gridnew / width) setattr(self, dt, sep * width)
[docs] def binning(self, width: list[int] = [1, 1, 1]) -> None: """Binning up neighboring pixels in the v, y, and x domain. Args: width (list, optional): Number of channels, y-pixels, and x-pixels for binning. Defaults to [1, 1, 1]. """ w = np.array([1] * (3 - len(width)) + list(width), dtype=int) if self.pv: w[1] = max(w[0], w[1]) w[2] = 1 self.data = to4dim(self.data) size = np.array(np.shape(self.data)) if np.any(w > size[1:]): w = np.minimum(w, size[1:]) ws = ', '.join([f'{s:d}' for s in w]) print(f'width was changed to [{ws}].') if (not self.pv and w[0] > 1) or (self.pv and w[1] > 1): width_v = w[1] if self.pv else w[0] print(f'sigma has been divided by sqrt({width_v:d})' + ' because of binning in the v-axis.') self.sigma = self.sigma / np.sqrt(width_v) if (not self.pv and w[1] > 1) or w[2] > 1: print('Binning in the x- or y-axis does not update sigma.') self.size = size for t, ww in zip(['v', 'y', 'x'], w): self._binning_one(t, ww) self.data = np.squeeze(self.data) del self.size if self.pv: self.v = self.y self.dv = self.dy
[docs] def centering(self, includexy: bool = True, includev: bool = False, **kwargs: Any) -> None: """Spatial regridding to set the center at (x,y,v)=(0,0,0). Args: includexy (bool, optional): Centering in the x and y directions at each channel. Defaults to True. includev (bool, optional): Centering in the v direction at each position. Defaults to False. """ if includexy: xnew = self.x - self.x[nearest_index(self.x)] ynew = self.y - self.y[nearest_index(self.y)] if includev: vnew = self.v - self.v[nearest_index(self.v)] if includexy and includev: self.data = RGIxyv(self.v, self.y, self.x, self.data, np.meshgrid(vnew, ynew, xnew, indexing='ij'), **kwargs) self.v, self.y, self.x = vnew, ynew, xnew elif includexy: self.data = RGIxy(self.y, self.x, self.data, np.meshgrid(ynew, xnew, indexing='ij'), **kwargs) self.y, self.x = ynew, xnew elif includev: nx, ny, nv = len(self.x), len(self.y), len(self.v) a = np.empty((ny, nx, nv)) for i in range(ny): for j in range(nx): f = RGI((self.v,), self.data[:, i, j], method='linear', bounds_error=False, fill_value=np.nan) a[i, j] = f(vnew) self.data = np.moveaxis(a, -1, 0) self.v = vnew else: print('No change because includexy=False and includev=False.')
[docs] @_need_multipixels def circularbeam(self) -> None: """Make the beam circular by convolving with 1D Gaussian """ if None in self.beam: print('No beam.') return bmaj, bmin, bpa = self.beam self.rotate(-bpa) nx = len(self.x) if len(self.x) % 2 == 1 else len(self.x) - 1 ny = len(self.y) if len(self.y) % 2 == 1 else len(self.y) - 1 y = np.linspace(-(ny-1) / 2, (ny-1) / 2, ny) * np.abs(self.dy) g1 = np.exp(-4 * np.log(2) * y**2 / (bmaj**2 - bmin**2)) e = np.sqrt(1 - bmin**2 / bmaj**2) g1 /= np.sqrt(np.pi / 4 / np.log(2) * bmin * e) g = np.zeros((ny, nx)) g[:, (nx - 1) // 2] = g1 d = self.data.copy() d[np.isnan(d)] = 0 self.data = np.squeeze(convolve(to4dim(d), [[g]], mode='same')) self.rotate(bpa) self.beam[1] = self.beam[0] self.beam[2] = 0
[docs] def deproject(self, pa: float = 0, incl: float = 0, **kwargs: Any) -> None: """Exapnd by a factor of 1/cos(incl) in the direction of pa+90 deg. Args: pa (float, optional): Position angle in the unit of degree. Defaults to 0. incl (float, optional): Inclination angle in the unit of degree. Defaults to 0. """ ci = np.cos(np.radians(incl)) A = np.linalg.multi_dot([Mrot(pa), Mfac(1, ci), Mrot(-pa)]) yxnew = dot2d(A, np.meshgrid(self.y, self.x, indexing='ij')) self.data = RGIxy(self.y, self.x, self.data, yxnew, **kwargs) if None not in self.beam: bmaj, bmin, bpa = self.beam a, b = np.linalg.multi_dot([Mfac(1 / bmaj, 1 / bmin), Mrot(pa - bpa), Mfac(1, ci), Mrot(-pa)]).T alpha = (np.dot(a, a) + np.dot(b, b)) / 2 beta = np.dot(a, b) gamma = (np.dot(a, a) - np.dot(b, b)) / 2 bpa_new = np.arctan(beta / gamma) / 2 * np.degrees(1) if beta * bpa_new >= 0: bpa_new += 90 Det = np.sqrt(beta**2 + gamma**2) bmaj_new = 1 / np.sqrt(alpha - Det) bmin_new = 1 / np.sqrt(alpha + Det) self.beam = np.array([bmaj_new, bmin_new, bpa_new])
[docs] @_need_multipixels def fit2d(self, model: Callable, bounds: np.ndarray, progressbar: bool = False, kwargs_fit: dict = {}, kwargs_plotcorner: dict = {}, chan: int | None = None) -> dict[str, Any] | None: """Fit a given 2D model function to self.data. Default keyword values: kwargs_plotcorner: ``show=False`` and ``savefig=None``. User-supplied values in ``kwargs_plotcorner`` override these defaults. Args: model (function): The model function in the form of f(par, x, y). bounds (np.ndarray): bounds for fitting_utils.EmceeCorner. progressbar (bool, optional): progressbar for fitting_utils.EmceeCorner. Defaults to False. kwargs_fit (dict, optional): Arguments for fitting_utils.EmceeCorner.fit. kwargs_plotcorner (dict, optional): Arguments for fitting_utils.EmceeCorner.plotcorner. chan (int, optional): The channel number where the 2D model is fitted. Defaults to None. Returns: dict: The parameter sets (popt, plow, pmid, and phigh), the best 2D model array (model), and the residual from the model (residual). """ d = self.data if chan is None else self.data[chan] x, y = np.meshgrid(self.x, self.y) if None not in self.beam: Omega = np.pi * self.beam[0] * self.beam[1] / 4 / np.log(2) pixelperbeam = Omega / np.abs(self.dx * self.dy) else: pixelperbeam = 1. s = 'sigma is multiplied by sqrt(pixel-per-beam)' \ + ' to consider the noise correlation in a beam.' \ + ' This correction is relatively conservative.' warnings.warn(s, UserWarning) def logl(p: np.ndarray) -> float: rss = np.nansum((model(p, x, y) - d)**2) return -0.5 * rss / self.sigma**2 / pixelperbeam mcmc = EmceeCorner(bounds=bounds, logl=logl, progressbar=progressbar) kwargs_fit0 = {} kwargs_fit0.update(kwargs_fit) mcmc.fit(**kwargs_fit0) kwargs_plotcorner0 = {'show': False, 'savefig': None} kwargs_plotcorner0.update(kwargs_plotcorner) kw_pl = kwargs_plotcorner0 if kw_pl['show'] or kw_pl['savefig'] is not None: mcmc.plotcorner(**kw_pl) popt = mcmc.popt plow = mcmc.plow pmid = mcmc.pmid phigh = mcmc.phigh modelopt = model(popt, x, y) residual = d - modelopt return {'popt': popt, 'plow': plow, 'pmid': pmid, 'phigh': phigh, 'model': modelopt, 'residual': residual}
[docs] @_need_multipixels def gaussfit2d(self, chan: int | None = None) -> dict: """Fit a 2D Gaussian function to self.data using fitting_utils.gaussfit2d(). Args: chan (int): The channel number where the 2D Gaussian is fitted. Defaults to None. Returns: dict: The best parameter set (popt), the error set (perr), the best 2D Gaussian array (model), the residual from the model (residual), and the coordinates of the best-fit center (center). """ z = self.data if chan is None else self.data[chan] Omega = np.pi * self.beam[0] * self.beam[1] / 4 / np.log(2) pixelperbeam = Omega / np.abs(self.dx * self.dy) s = 'sigma is multiplied by sqrt(pixel-per-beam)' \ + ' to consider the noise correlation in a beam.' \ + ' This correction is relatively conservative.' warnings.warn(s, UserWarning) res = gaussfit2d(xdata=self.x, ydata=self.y, zdata=z, sigma=self.sigma * np.sqrt(pixelperbeam), show=False, nwalkersperdim=4) popt, perr = res['popt'], res['perr'] model = gaussian2d(np.meshgrid(self.x, self.y), *popt) residual = z - model if self.center is not None: newcenter = xy2coord(popt[1:3] / 3600, coordorg=self.center) else: newcenter = None return {'popt': popt, 'perr': perr, 'model': model, 'residual': residual, 'center': newcenter}
[docs] def histogram(self, **kwargs: Any) -> tuple[np.ndarray, np.ndarray]: """Output histogram of self.data using numpy.histogram. This method can take the arguments of numpy.histogram. Returns: tuple: (bins, histogram) """ hist, hbin = np.histogram(self.data[~np.isnan(self.data)], **kwargs) hbin = (hbin[:-1] + hbin[1:]) / 2 return hbin, hist
[docs] def mask(self, dataformask: np.ndarray | None = None, includepix: list[float, float] = [], excludepix: list[float, float] = []) -> None: """Mask self.data using a 2D or 3D array of dataformask. Args: dataformask (np.ndarray, optional): 2D or 3D array is used for specifying the mask. includepix (list, optional): Data in this range survives. Defaults to []. excludepix (list, optional): Data in this range is masked. Defaults to []. """ if dataformask is None: dataformask = self.data if np.ndim(self.data) > np.ndim(dataformask): print('The mask is broadcasted.') mask = np.full(np.shape(self.data), dataformask) else: mask = dataformask if np.shape(self.data) != np.shape(mask): print('The dataformask has a different shape.') return if len(includepix) == 2: self.data[(mask < includepix[0]) + (includepix[1] < mask)] = np.nan if len(excludepix) == 2: self.data[(excludepix[0] < mask) * (mask < excludepix[1])] = np.nan
def _gfit_profile(self, prof: list, gaussfit: bool) -> dict[str, Any]: if not gaussfit: return {} gfitres = {} nprof = len(prof) res = [None] * nprof for i in range(nprof): res[i] = gaussfit1d(xdata=self.v, ydata=prof[i], sigma=None, show=True, nwalkersperdim=8) gfitres['best'] = [a['popt'][:3] for a in res] gfitres['error'] = [a['perr'][:3] for a in res] return gfitres
[docs] def profile(self, coords: list[str] = [], xlist: list[float] = [], ylist: list[float] = [], ellipse: list[float, float, float] | None = None, ninterp: int = 1, flux: bool = False, gaussfit: bool = False ) -> tuple[np.ndarray, np.ndarray, dict] | None: """Get a list of line profiles at given spatial coordinates. Args: coords (list, optional): Text coordinates. Defaults to []. xlist (list, optional): Offset from center. Defaults to []. ylist (list, optional): Offset from center. Defaults to []. ellipse (list, optional): [major, minor, pa]. For average. Defaults to None. ninterp (int, optional): Number of points for interpolation. Defaults to 1. flux (bool, optional): Jy/beam to Jy. Defaults to False. gaussfit (bool, optional): Fit the profiles. Defaults to False. Returns: tuple: (v, list of profiles, result of Gaussian fit) """ if np.ndim(self.data) != 3 or self.v is None: print('Data must be 3D with the v, y, and x axes.') return data, xf, yf = filled2d(self.data, self.x, self.y, ninterp) x, y = np.meshgrid(xf, yf) if len(coords) > 0: xlist, ylist = coord2xy(coords, self.center) * 3600. nprof = len(xlist) prof = np.empty((nprof, len(self.v))) ellipse = ellipse or [[0, 0, 0]] * nprof calc = np.sum if flux else np.mean for i, (xc, yc, e) in enumerate(zip(xlist, ylist, ellipse)): major, minor, pa = e z = dot2d(Mrot(-pa), [y - yc, x - xc]) if major == 0 or minor == 0: r = np.hypot(*z) idx = np.unravel_index(np.argmin(r), np.shape(r)) prof[i] = [d[idx] for d in data] else: r = np.hypot(*dot2d(Mfac(2 / major, 2 / minor), z)) prof[i] = [calc(d[r <= 1]) for d in data] if flux: if None in self.beam or None in [self.dx, self.dy]: print('None in beam, dx, or dy. Flux is not converted.') else: Omega = np.pi * self.beam[0] * self.beam[1] / 4. / np.log(2.) prof *= np.abs(self.dx * self.dy) / Omega gfitres = self._gfit_profile(prof, gaussfit) return self.v, prof, gfitres
[docs] def rotate(self, pa: float = 0, **kwargs: Any) -> None: """Counter clockwise rotation with respect to the center. Args: pa (float, optional): Position angle in the unit of degree. Defaults to 0. """ yxnew = dot2d(Mrot(-pa), np.meshgrid(self.y, self.x, indexing='ij')) self.data = RGIxy(self.y, self.x, self.data, yxnew, **kwargs) if self.beam[2] is not None: self.beam[2] = self.beam[2] + pa
[docs] def slice(self, length: float = 0, pa: float = 0, dx: float | None = None, **kwargs: Any) -> np.ndarray | None: """Get 1D slice with given a length and a position-angle. Args: length (float, optional): Slice line length. Defaults to 0. pa (float, optional): Position angle in the unit of degree. Defaults to 0. dx (float, optional): Grid increment. Defaults to None. Returns: np.ndarray: [x, data]. If self.data is 3D, the output data are in the shape of (len(v), len(x)). """ if dx is None and self.dx is not None: dx = np.abs(self.dx) if dx is None: print('dx was not found. Please input dx.') return n = int(np.ceil(length / 2 / dx)) r = np.linspace(-n, n, 2 * n + 1) * dx pa_rad = np.radians(pa) yg, xg = r * np.cos(pa_rad), r * np.sin(pa_rad) z = RGIxy(self.y, self.x, self.data, (yg, xg), **kwargs) return np.array([r, z])
[docs] def todict(self) -> dict: """Output the attributes as a dictionary that can be input to PlotAstroData. Returns: dict: Output that can be input to PlotAstroData. """ d = {'data': self.data, 'x': self.x, 'y': self.y, 'v': self.v, 'fitsimage': self.fitsimage, 'beam': self.beam, 'Tb': self.Tb, 'restfreq': self.restfreq, 'cfactor': self.cfactor, 'sigma': self.sigma, 'center': self.center, 'pv': self.pv, 'bunit': self.bunit} return d
def _put_header(self, h: dict, t: str, crpix: int, crval: float, cdelt: float | None = None) -> None: fhd = self.fitsheader axis = {'x': 1, 'y': 2, 'v': 2 if self.pv else 3}[t] u = f'CUNIT{axis}' indeg = fhd is None or u not in fhd or isdeg(fhd[u]) h[f'NAXIS{axis}'] = len(getattr(self, t)) h[f'CRPIX{axis}'] = int(crpix) h[f'CRVAL{axis}'] = float(crval) if cdelt is None: cdelt = getattr(self, f'd{t}') / (3600 if indeg else 1) h[f'CDELT{axis}'] = float(cdelt) def _get_cvdv_in_freq(self, ck: int) -> tuple[float, float]: cv = self.v[ck] dv = self.dv if self.restfreq is None or self.restfreq == 0: s = 'No valid restfreq. The velocity axis is saved as is.' warnings.warn(s, UserWarning) return cv, dv cv = (1 - cv / cu.c_kms) * self.restfreq dv = -dv / cu.c_kms * self.restfreq return cv, dv
[docs] @_need_multipixels def writetofits(self, fitsimage: str = 'out.fits', header: dict = {}) -> None: """Write out the AstroData to a FITS file. Args: fitsimage (str, optional): Output FITS file name. Existing files with the same name are overwritten. Defaults to 'out.fits'. header (dict, optional): Header dictionary. Defaults to {}. """ h = {} ci = nearest_index(self.x) cx = 0 if not self.pv: cj = nearest_index(self.y) if self.center is None: cx, cy = self.x[ci], self.x[cj] else: xy = [self.x[ci] / 3600, self.y[cj] / 3600] cx, cy = coord2xy(xy2coord(xy, self.center)) self._put_header(h, 'x', crpix=ci + 1, crval=cx) if not self.pv: self._put_header(h, 'y', crpix=cj + 1, crval=cy) if self.dv is not None: ck = nearest_index(self.v) cv, dv = self._get_cvdv_in_freq(ck) self._put_header(h, 'v', crpix=ck + 1, crval=cv, cdelt=dv) if None not in self.beam: beam = self.beam_org if self.pv else self.beam h['BMAJ'] = float(beam[0] / 3600) h['BMIN'] = float(beam[1] / 3600) h['BPA'] = float(beam[2]) h.update(header) data2fits(d=self.data, h=h, templatefits=self.fitsimage_org, fitsimage=fitsimage)
def _as_list(value: Any, n: int, isbeam: bool = False) -> Any: if isbeam: return [value] * n if np.ndim(value) == 1 else value else: return value if isinstance(value, list) else [value] * n def _scalar_if_single(value: Any, n: int) -> Any: return value[0] if n == 1 else value def _get_gridsep(axis: np.ndarray | None) -> float | None: return axis[1] - axis[0] if axis is not None and len(axis) > 1 else None ASTRODATA_ARGS = ['fitsimage', 'data', 'Tb', 'sigma', 'center', 'restfreq', 'cfactor', 'bunit', 'fitsimage_org', 'sigma_org', 'beam_org', 'fitsheader', 'pv', 'pvpa']
[docs] @dataclass class AstroFrame(): """Parameter set to limit and reshape the data in the AstroData format. Args: vmin (float, optional): Velocity at the upper left. Defaults to -1e10. vmax (float, optional): Velocity at the lower bottom. Defaults to 1e10. vsys (float, optional): Each channel shows v-vsys. Defaults to 0.. center (str, optional): Central coordinate like '12h34m56.7s 12d34m56.7s'. Defaults to None. fitsimage (str, optional): Fits to get center. Defaults to None. rmax (float, optional): The x range is [-rmax, rmax]. The y range is [-rmax, rmax]. Defaults to 1e10. xmax (float, optional): The x range is [xmin, xmax]. Defaults to None. xmin (float, optional): The x range is [xmin, xmax]. Defaults to None. ymax (float, optional): The y range is [ymin, ymax]. Defaults to None. ymin (float, optional): The y range is [ymin, ymax]. Defaults to None. dist (float, optional): Change x and y in arcsec to au. Defaults to 1.. xoff (float, optional): Map center relative to the center. Defaults to 0. yoff (float, optional): Map center relative to the center. Defaults to 0. xflip (bool, optional): True means left is positive x. Defaults to True. yflip (bool, optional): True means bottom is positive y. Defaults to False. swapxy (bool, optional): True means x and y are swapped. Defaults to False. pv (bool, optional): Mode for PV diagram. Defaults to False. quadrants (str, optional): '13' or '24'. Quadrants to take mean. None means not taking mean. Defaults to None. """ rmax: float = 1e10 xmax: float | None = None xmin: float | None = None ymax: float | None = None ymin: float | None = None dist: float = 1 center: str | None = None fitsimage: str | None = None xoff: float = 0 yoff: float = 0 vsys: float = 0 vmin: float = -1e20 vmax: float = 1e20 xflip: bool = True yflip: bool = False swapxy: bool = False pv: bool = False quadrants: str | None = None def __post_init__(self) -> None: self.xdir = -1 if self.xflip else 1 self.ydir = -1 if self.yflip else 1 self.xmin = -self.rmax if self.xmin is None else self.xmin self.xmax = self.rmax if self.xmax is None else self.xmax self.ymin = -self.rmax if self.ymin is None else self.ymin self.ymax = self.rmax if self.ymax is None else self.ymax if self.xflip: self.xmin, self.xmax = self.xmax, self.xmin if self.yflip: self.ymin, self.ymax = self.ymax, self.ymin xlim = [self.xoff + self.xmin, self.xoff + self.xmax] ylim = [self.yoff + self.ymin, self.yoff + self.ymax] vlim = [self.vmin, self.vmax] if self.pv: xlim = sorted(xlim) if not self.xflip: xlim.reverse() self.xlim, self.ylim, self.vlim = xlim, ylim, vlim _x, _y = (xlim, vlim) if self.pv else (xlim, ylim) self.Xlim, self.Ylim = (_y, _x) if self.swapxy else (_x, _y) if self.quadrants is not None: self.Xlim = [0, self.rmax] self.Ylim = [0, min(self.vmax, -self.vmin)] if self.fitsimage is not None and self.center is None: self.center = FitsData(self.fitsimage).get_center()
[docs] def pos2xy(self, poslist: list[str | list[float, float]] = [] ) -> np.ndarray: """Text or relative to absolute coordinates. Args: poslist (list, optional): Text coordinates or relative coordinates. Defaults to []. Returns: np.ndarray: absolute coordinates. """ onexy = np.shape(poslist) == (2,) and not isinstance(poslist[0], str) if np.shape(poslist) == () or onexy: poslist = [poslist] x, y = [None] * len(poslist), [None] * len(poslist) for i, p in enumerate(poslist): if isinstance(p, str): x[i], y[i] = coord2xy(p, self.center) * 3600. else: x[i], y[i] = rel2abs(*p, self.Xlim, self.Ylim) return np.array([x, y])
def _get_restfreq(self, header: dict) -> float | None: """Extract rest frequency from FITS header.""" if 'RESTFRQ' in header: return header['RESTFRQ'] if 'RESTFREQ' in header: return header['RESTFREQ'] if 'NAXIS3' in header and header['NAXIS3'] == 1 and not self.pv: return header['CRVAL3'] return None def _read_fitsimage(self, d: AstroData, i: int, grid: list) -> list: """Read FITS-derived values into d and return the FITS grid.""" if d.fitsimage[i] is None: return grid fd = FitsData(d.fitsimage[i]) if d.fitsheader[i] is None: d.fitsheader[i] = fd.get_header() if d.center[i] is None and not self.pv: d.center[i] = fd.get_center() if d.restfreq[i] is None: d.restfreq[i] = self._get_restfreq(d.fitsheader[i]) d.data[i] = fd.get_data() grid = fd.get_grid(center=d.center[i], dist=self.dist, restfreq=d.restfreq[i], vsys=self.vsys, pv=self.pv) if fd.wcsrot: d.center[i] = fd.get_center() d.beam[i] = fd.get_beam(dist=self.dist) d.bunit[i] = fd.get_header('BUNIT') return grid def _shift_center(self, d: AstroData, i: int, grid: list) -> list: corg = d.center[i] cnew = self.center if self.pv or cnew is None or corg is None or corg == cnew: return grid cx, cy = coord2xy(corg, cnew) * 3600 grid[0] = grid[0] + cx # Don't use += cx. grid[1] = grid[1] + cy # Don't use += cy. d.center[i] = cnew return grid def _ascending_v(self, d: AstroData, i: int, v: np.ndarray | None) -> None: if v is not None and len(v) > 1 and v[1] < v[0]: d.data[i], v = d.data[i][::-1], v[::-1] print('Velocity has been inverted.') d.v = v def _xyskip(self, d: AstroData, i: int, x: np.ndarray | None, y: np.ndarray | None) -> None: d.x = x[::self.xskip] d.y = y[::self.yskip] data = np.moveaxis(d.data[i], [-2, -1], [0, 1]) data = data[::self.yskip, ::self.xskip] d.data[i] = np.moveaxis(data, [0, 1], [-2, -1]) def _trim_skip(self, d: AstroData, i: int, grid: list) -> None: d.data[i], grid = trim(data=d.data[i], x=grid[0], y=grid[1], v=grid[2], xlim=self.xlim, ylim=self.ylim, vlim=self.vlim, pv=self.pv) self._ascending_v(d, i, v=grid[2]) grid = [grid[0], d.v] if self.pv else [grid[0], grid[1]] if self.swapxy: grid.reverse() d.data[i] = np.moveaxis(d.data[i], 1, 0) self._xyskip(d, i, x=grid[0], y=grid[1]) if self.pv: d.v = d.y for axis in ['x', 'y', 'v']: setattr(d, f'd{axis}', _get_gridsep(getattr(d, axis))) def _convert_to_Tb(self, d: AstroData, i: int) -> None: """Convert Jy/beam data to brightness temperature if requested.""" if not d.Tb[i]: return dx = d.dy if self.swapxy else d.dx header = {'CDELT1': dx / 3600, 'CUNIT1': 'deg', 'RESTFREQ': d.restfreq[i]} if None not in d.beam[i]: header['BMAJ'] = d.beam[i][0] / 3600 / self.dist header['BMIN'] = d.beam[i][1] / 3600 / self.dist factor = Jy2K(header=header) d.data[i] = d.data[i] * factor if d.sigma[i] is not None: d.sigma[i] = d.sigma[i] * factor def _set_pv_beam(self, d: AstroData, i: int) -> None: """Set effective PV beam.""" if not self.pv or d.pv[i] or None in d.beam[i]: return bmaj, bmin, bpa = d.beam_org[i] = d.beam[i] if d.pvpa[i] is None: d.pvpa[i] = bpa print('pvpa is not specified. pvpa=bpa is assumed.') angle = np.radians(bpa - d.pvpa[i]) beam_incut = 1 / np.hypot(np.cos(angle) / bmaj, np.sin(angle) / bmin) d.beam[i] = np.array([np.abs(d.dv), beam_incut, 0]) def _read_one(self, d: AstroData, i: int) -> None: if d.center[i] == 'common': d.center[i] = self.center d.sigma_org[i] = d.sigma[i] grid = self._read_fitsimage(d, i, grid=[d.x, d.y, d.v]) if d.data[i] is not None: d.sigma[i] = estimate_rms(d.data[i], d.sigma[i]) grid = self._shift_center(d, i, grid) self._trim_skip(d, i, grid) if self.quadrants is not None: d.data[i], d.x, d.y \ = quadrantmean(d.data[i], d.x, d.y, self.quadrants) d.data[i] = d.data[i] * d.cfactor[i] if d.sigma[i] is not None: d.sigma[i] = d.sigma[i] * d.cfactor[i] self._convert_to_Tb(d, i) self._set_pv_beam(d, i) d.pv[i] = self.pv d.Tb[i] = False d.cfactor[i] = 1 d.fitsimage_org[i] = d.fitsimage[i] d.fitsimage[i] = None
[docs] def read(self, d: AstroData, xskip: int = 1, yskip: int = 1) -> None: """Get data, grid, sigma, beam, and bunit from AstroData, which is a part of the input of add_color, add_contour, add_segment, and add_rgb. This method modifies ``d`` in place. During the read, the input fields are normalized to per-dataset lists, FITS-derived values are filled, trimming and coordinate-frame changes are applied, and bookkeeping fields such as ``fitsimage``, ``fitsimage_org``, ``Tb``, ``cfactor``, and ``sigma`` are updated. Args: d (AstroData): Dataclass for the add_* input. xskip, yskip (int): Spatial pixel skip. Defaults to 1. """ self.xskip = xskip self.yskip = yskip for name in ASTRODATA_ARGS: setattr(d, name, _as_list(getattr(d, name), d.n)) d.beam = _as_list(d.beam, d.n, isbeam=True) for i in range(d.n): self._read_one(d, i) for name in ASTRODATA_ARGS + ['beam']: setattr(d, name, _scalar_if_single(getattr(d, name), d.n))