import numpy as np
import warnings
from dataclasses import dataclass
from scipy.interpolate import RegularGridInterpolator as RGI
from scipy.signal import convolve
from typing import Any, Callable
from plotastrodata import const_utils as cu
from plotastrodata.coord_utils import coord2xy, rel2abs, xy2coord
from plotastrodata.fits_utils import data2fits, FitsData, Jy2K
from plotastrodata.fitting_utils import (EmceeCorner, gaussfit1d,
gaussfit2d, gaussian2d)
from plotastrodata.matrix_utils import dot2d, Mfac, Mrot
from plotastrodata.noise_utils import estimate_rms
from plotastrodata.other_utils import (isdeg, nearest_index,
RGIxy, RGIxyv, to4dim, trim)
[docs]
def quadrantmean(data: np.ndarray, x: np.ndarray, y: np.ndarray,
quadrants: str = '13'
) -> tuple[np.ndarray, np.ndarray, np.ndarray] | None:
"""Take mean between 1st and 3rd (or 2nd and 4th) quadrants.
Args:
data (np.ndarray): 2D array.
x (np.ndarray): 1D array. First coordinate.
y (np.ndarray): 1D array. Second coordinate.
quadrants (str, optional): '13' or '24'. Defaults to '13'.
Returns:
tuple: Averaged (data, x, y).
"""
if np.ndim(data) != 2:
print('data must be 2D.')
return
if quadrants not in ['13', '24']:
print("quadrants must be '13' or '24'.")
return
dx = x[1] - x[0]
dy = y[1] - y[0]
nx = int(np.ceil(np.max(np.abs(x)) / dx))
ny = int(np.ceil(np.max(np.abs(y)) / dy))
xnew = np.linspace(-nx, nx, 2 * nx + 1) * dx
ynew = np.linspace(-ny, ny, 2 * ny + 1) * dy
s = 1 if quadrants == '13' else -1
f = RGI((y, s * x), data, bounds_error=False, fill_value=np.nan)
datanew = f(np.meshgrid(ynew, xnew, indexing='ij'))
datanew = (datanew + datanew[::-1, ::-1]) / 2.
return datanew[ny:, nx:], xnew[nx:], ynew[ny:]
[docs]
def filled2d(data: np.ndarray, x: np.ndarray, y: np.ndarray, n: int = 1,
**kwargs: Any) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Fill 2D data, 1D x, and 1D y by a factor of n using RGI.
Args:
data (np.ndarray): 2D or 3D array.
x (np.ndarray): 1D array.
y (np.ndarray): 1D array.
n (int, optional): How many times more the new grid is. Defaults to 1.
Returns:
tuple: The interpolated (data, x, y).
"""
xnew = np.linspace(x[0], x[-1], n * (len(x) - 1) + 1)
ynew = np.linspace(y[0], y[-1], n * (len(y) - 1) + 1)
d = RGIxy(y, x, data, np.meshgrid(ynew, xnew, indexing='ij'),
**kwargs)
return d, xnew, ynew
def _need_multipixels(method: Callable) -> Callable:
def wrapper(cls: Any, *args: Any, **kwargs: Any) -> Any | None:
singlepixel = cls.dx is None or cls.dy is None
if singlepixel:
print('No pixel size.')
return
return method(cls, *args, **kwargs)
return wrapper
[docs]
@dataclass
class AstroData():
"""Data to be processed and parameters for processing the data.
Args:
data (np.ndarray, optional): 2D or 3D array. Defaults to None.
x (np.ndarray, optional): 1D array. Defaults to None.
y (np.ndarray, optional): 1D array. Defaults to None.
v (np.ndarray, optional): 1D array. Defaults to None.
beam (np.ndarray, optional): [bmaj, bmin, bpa]. Defaults to [None, None, None].
fitsimage (str, optional): Input fits name. Defaults to None.
Tb (bool, optional): True means the data array is brightness temperature. Defaults to False.
sigma (float or str, optional): Noise level or method for measuring it. Defaults to 'hist'.
center (str, optional): Text coordinates. 'common' means initialized value. Defaults to 'common'.
restfreq (float, optional): Used for velocity and brightness temperature. Defaults to None.
cfactor (float, optional): The data array is multiplied by cfactor. Defaults to 1.
pvpa (float, optional): Position angle of the PV cut. Defaults to None.
pv (bool, optional): True means the data array is a position-velocity diagram. Defaults to False.
bunit (str, optional): The unit of the data array. Defaults to ''.
"""
data: np.ndarray | None = None
x: np.ndarray | None = None
y: np.ndarray | None = None
v: np.ndarray | None = None
beam: np.ndarray | tuple[None] = (None, None, None)
fitsimage: str | None = None
Tb: bool = False
sigma: str | float | None = 'hist'
center: str = 'common'
restfreq: float | None = None
cfactor: float = 1
pvpa: float | None = None
pv: bool = False
bunit: str = ''
def __post_init__(self) -> None:
n = 0
if self.fitsimage is not None:
if not isinstance(self.fitsimage, list):
n = 1
elif any(a is not None for a in self.fitsimage):
n = len(self.fitsimage)
else:
n = 0
if n > 0:
self.data = None
if self.data is not None:
if not isinstance(self.data, list):
n = 1
elif any(a is not None for a in self.data):
n = len(self.data)
else:
n = 0
if n == 0:
print('Either data or fitsimage must be given.')
self.n = n
self.fitsimage_org = None
self.sigma_org = None
self.beam_org = None
self.fitsheader = None
def _binning_one(self, t: str, width: float) -> None:
grid = getattr(self, t)
if width == 1 or grid is None:
return
dt = f'd{t}'
sep = getattr(self, dt)
if sep is None:
s = f'Skip binning in the {t}-axis because {dt} is None.'
warnings.warn(s, UserWarning)
return
i = {'v': 1, 'y': 2, 'x': 3}[t]
sizenew = self.size[i] // width
self.size[i] = sizenew
data = np.moveaxis(self.data, i, 0)
datanew = np.moveaxis(np.zeros(self.size), i, 0)
gridnew = np.zeros(sizenew)
for start in range(width):
stop = start + sizenew * width
datanew += data[start:stop:width]
gridnew += grid[start:stop:width]
self.data = np.moveaxis(datanew, 0, i) / width
setattr(self, t, gridnew / width)
setattr(self, dt, sep * width)
[docs]
def binning(self, width: list[int] = [1, 1, 1]) -> None:
"""Binning up neighboring pixels in the v, y, and x domain.
Args:
width (list, optional): Number of channels, y-pixels, and x-pixels for binning. Defaults to [1, 1, 1].
"""
w = np.array([1] * (3 - len(width)) + list(width), dtype=int)
if self.pv:
w[1] = max(w[0], w[1])
w[2] = 1
self.data = to4dim(self.data)
size = np.array(np.shape(self.data))
if np.any(w > size[1:]):
w = np.minimum(w, size[1:])
ws = ', '.join([f'{s:d}' for s in w])
print(f'width was changed to [{ws}].')
if (not self.pv and w[0] > 1) or (self.pv and w[1] > 1):
width_v = w[1] if self.pv else w[0]
print(f'sigma has been divided by sqrt({width_v:d})'
+ ' because of binning in the v-axis.')
self.sigma = self.sigma / np.sqrt(width_v)
if (not self.pv and w[1] > 1) or w[2] > 1:
print('Binning in the x- or y-axis does not update sigma.')
self.size = size
for t, ww in zip(['v', 'y', 'x'], w):
self._binning_one(t, ww)
self.data = np.squeeze(self.data)
del self.size
if self.pv:
self.v = self.y
self.dv = self.dy
[docs]
def centering(self, includexy: bool = True,
includev: bool = False,
**kwargs: Any) -> None:
"""Spatial regridding to set the center at (x,y,v)=(0,0,0).
Args:
includexy (bool, optional): Centering in the x and y directions at each channel. Defaults to True.
includev (bool, optional): Centering in the v direction at each position. Defaults to False.
"""
if includexy:
xnew = self.x - self.x[nearest_index(self.x)]
ynew = self.y - self.y[nearest_index(self.y)]
if includev:
vnew = self.v - self.v[nearest_index(self.v)]
if includexy and includev:
self.data = RGIxyv(self.v, self.y, self.x, self.data,
np.meshgrid(vnew, ynew, xnew, indexing='ij'),
**kwargs)
self.v, self.y, self.x = vnew, ynew, xnew
elif includexy:
self.data = RGIxy(self.y, self.x, self.data,
np.meshgrid(ynew, xnew, indexing='ij'),
**kwargs)
self.y, self.x = ynew, xnew
elif includev:
nx, ny, nv = len(self.x), len(self.y), len(self.v)
a = np.empty((ny, nx, nv))
for i in range(ny):
for j in range(nx):
f = RGI((self.v,), self.data[:, i, j], method='linear',
bounds_error=False, fill_value=np.nan)
a[i, j] = f(vnew)
self.data = np.moveaxis(a, -1, 0)
self.v = vnew
else:
print('No change because includexy=False and includev=False.')
[docs]
@_need_multipixels
def circularbeam(self) -> None:
"""Make the beam circular by convolving with 1D Gaussian
"""
if None in self.beam:
print('No beam.')
return
bmaj, bmin, bpa = self.beam
self.rotate(-bpa)
nx = len(self.x) if len(self.x) % 2 == 1 else len(self.x) - 1
ny = len(self.y) if len(self.y) % 2 == 1 else len(self.y) - 1
y = np.linspace(-(ny-1) / 2, (ny-1) / 2, ny) * np.abs(self.dy)
g1 = np.exp(-4 * np.log(2) * y**2 / (bmaj**2 - bmin**2))
e = np.sqrt(1 - bmin**2 / bmaj**2)
g1 /= np.sqrt(np.pi / 4 / np.log(2) * bmin * e)
g = np.zeros((ny, nx))
g[:, (nx - 1) // 2] = g1
d = self.data.copy()
d[np.isnan(d)] = 0
self.data = np.squeeze(convolve(to4dim(d), [[g]], mode='same'))
self.rotate(bpa)
self.beam[1] = self.beam[0]
self.beam[2] = 0
[docs]
def deproject(self, pa: float = 0, incl: float = 0,
**kwargs: Any) -> None:
"""Exapnd by a factor of 1/cos(incl) in the direction of pa+90 deg.
Args:
pa (float, optional): Position angle in the unit of degree. Defaults to 0.
incl (float, optional): Inclination angle in the unit of degree. Defaults to 0.
"""
ci = np.cos(np.radians(incl))
A = np.linalg.multi_dot([Mrot(pa), Mfac(1, ci), Mrot(-pa)])
yxnew = dot2d(A, np.meshgrid(self.y, self.x, indexing='ij'))
self.data = RGIxy(self.y, self.x, self.data, yxnew, **kwargs)
if None not in self.beam:
bmaj, bmin, bpa = self.beam
a, b = np.linalg.multi_dot([Mfac(1 / bmaj, 1 / bmin),
Mrot(pa - bpa),
Mfac(1, ci),
Mrot(-pa)]).T
alpha = (np.dot(a, a) + np.dot(b, b)) / 2
beta = np.dot(a, b)
gamma = (np.dot(a, a) - np.dot(b, b)) / 2
bpa_new = np.arctan(beta / gamma) / 2 * np.degrees(1)
if beta * bpa_new >= 0:
bpa_new += 90
Det = np.sqrt(beta**2 + gamma**2)
bmaj_new = 1 / np.sqrt(alpha - Det)
bmin_new = 1 / np.sqrt(alpha + Det)
self.beam = np.array([bmaj_new, bmin_new, bpa_new])
[docs]
@_need_multipixels
def fit2d(self, model: Callable, bounds: np.ndarray,
progressbar: bool = False,
kwargs_fit: dict = {}, kwargs_plotcorner: dict = {},
chan: int | None = None) -> dict[str, Any] | None:
"""Fit a given 2D model function to self.data.
Default keyword values:
kwargs_plotcorner: ``show=False`` and ``savefig=None``. User-supplied values in ``kwargs_plotcorner`` override these defaults.
Args:
model (function): The model function in the form of f(par, x, y).
bounds (np.ndarray): bounds for fitting_utils.EmceeCorner.
progressbar (bool, optional): progressbar for fitting_utils.EmceeCorner. Defaults to False.
kwargs_fit (dict, optional): Arguments for fitting_utils.EmceeCorner.fit.
kwargs_plotcorner (dict, optional): Arguments for fitting_utils.EmceeCorner.plotcorner.
chan (int, optional): The channel number where the 2D model is fitted. Defaults to None.
Returns:
dict: The parameter sets (popt, plow, pmid, and phigh), the best 2D model array (model), and the residual from the model (residual).
"""
d = self.data if chan is None else self.data[chan]
x, y = np.meshgrid(self.x, self.y)
if None not in self.beam:
Omega = np.pi * self.beam[0] * self.beam[1] / 4 / np.log(2)
pixelperbeam = Omega / np.abs(self.dx * self.dy)
else:
pixelperbeam = 1.
s = 'sigma is multiplied by sqrt(pixel-per-beam)' \
+ ' to consider the noise correlation in a beam.' \
+ ' This correction is relatively conservative.'
warnings.warn(s, UserWarning)
def logl(p: np.ndarray) -> float:
rss = np.nansum((model(p, x, y) - d)**2)
return -0.5 * rss / self.sigma**2 / pixelperbeam
mcmc = EmceeCorner(bounds=bounds, logl=logl,
progressbar=progressbar)
kwargs_fit0 = {}
kwargs_fit0.update(kwargs_fit)
mcmc.fit(**kwargs_fit0)
kwargs_plotcorner0 = {'show': False, 'savefig': None}
kwargs_plotcorner0.update(kwargs_plotcorner)
kw_pl = kwargs_plotcorner0
if kw_pl['show'] or kw_pl['savefig'] is not None:
mcmc.plotcorner(**kw_pl)
popt = mcmc.popt
plow = mcmc.plow
pmid = mcmc.pmid
phigh = mcmc.phigh
modelopt = model(popt, x, y)
residual = d - modelopt
return {'popt': popt, 'plow': plow, 'pmid': pmid, 'phigh': phigh,
'model': modelopt, 'residual': residual}
[docs]
@_need_multipixels
def gaussfit2d(self, chan: int | None = None) -> dict:
"""Fit a 2D Gaussian function to self.data using fitting_utils.gaussfit2d().
Args:
chan (int): The channel number where the 2D Gaussian is fitted. Defaults to None.
Returns:
dict: The best parameter set (popt), the error set (perr), the best 2D Gaussian array (model), the residual from the model (residual), and the coordinates of the best-fit center (center).
"""
z = self.data if chan is None else self.data[chan]
Omega = np.pi * self.beam[0] * self.beam[1] / 4 / np.log(2)
pixelperbeam = Omega / np.abs(self.dx * self.dy)
s = 'sigma is multiplied by sqrt(pixel-per-beam)' \
+ ' to consider the noise correlation in a beam.' \
+ ' This correction is relatively conservative.'
warnings.warn(s, UserWarning)
res = gaussfit2d(xdata=self.x, ydata=self.y, zdata=z,
sigma=self.sigma * np.sqrt(pixelperbeam),
show=False, nwalkersperdim=4)
popt, perr = res['popt'], res['perr']
model = gaussian2d(np.meshgrid(self.x, self.y), *popt)
residual = z - model
if self.center is not None:
newcenter = xy2coord(popt[1:3] / 3600, coordorg=self.center)
else:
newcenter = None
return {'popt': popt, 'perr': perr,
'model': model, 'residual': residual,
'center': newcenter}
[docs]
def histogram(self, **kwargs: Any) -> tuple[np.ndarray, np.ndarray]:
"""Output histogram of self.data using numpy.histogram. This method can take the arguments of numpy.histogram.
Returns:
tuple: (bins, histogram)
"""
hist, hbin = np.histogram(self.data[~np.isnan(self.data)],
**kwargs)
hbin = (hbin[:-1] + hbin[1:]) / 2
return hbin, hist
[docs]
def mask(self, dataformask: np.ndarray | None = None,
includepix: list[float, float] = [],
excludepix: list[float, float] = []) -> None:
"""Mask self.data using a 2D or 3D array of dataformask.
Args:
dataformask (np.ndarray, optional): 2D or 3D array is used for specifying the mask.
includepix (list, optional): Data in this range survives. Defaults to [].
excludepix (list, optional): Data in this range is masked. Defaults to [].
"""
if dataformask is None:
dataformask = self.data
if np.ndim(self.data) > np.ndim(dataformask):
print('The mask is broadcasted.')
mask = np.full(np.shape(self.data), dataformask)
else:
mask = dataformask
if np.shape(self.data) != np.shape(mask):
print('The dataformask has a different shape.')
return
if len(includepix) == 2:
self.data[(mask < includepix[0]) + (includepix[1] < mask)] = np.nan
if len(excludepix) == 2:
self.data[(excludepix[0] < mask) * (mask < excludepix[1])] = np.nan
def _gfit_profile(self, prof: list, gaussfit: bool) -> dict[str, Any]:
if not gaussfit:
return {}
gfitres = {}
nprof = len(prof)
res = [None] * nprof
for i in range(nprof):
res[i] = gaussfit1d(xdata=self.v, ydata=prof[i],
sigma=None, show=True,
nwalkersperdim=8)
gfitres['best'] = [a['popt'][:3] for a in res]
gfitres['error'] = [a['perr'][:3] for a in res]
return gfitres
[docs]
def profile(self, coords: list[str] = [],
xlist: list[float] = [], ylist: list[float] = [],
ellipse: list[float, float, float] | None = None,
ninterp: int = 1,
flux: bool = False, gaussfit: bool = False
) -> tuple[np.ndarray, np.ndarray, dict] | None:
"""Get a list of line profiles at given spatial coordinates.
Args:
coords (list, optional): Text coordinates. Defaults to [].
xlist (list, optional): Offset from center. Defaults to [].
ylist (list, optional): Offset from center. Defaults to [].
ellipse (list, optional): [major, minor, pa]. For average. Defaults to None.
ninterp (int, optional): Number of points for interpolation. Defaults to 1.
flux (bool, optional): Jy/beam to Jy. Defaults to False.
gaussfit (bool, optional): Fit the profiles. Defaults to False.
Returns:
tuple: (v, list of profiles, result of Gaussian fit)
"""
if np.ndim(self.data) != 3 or self.v is None:
print('Data must be 3D with the v, y, and x axes.')
return
data, xf, yf = filled2d(self.data, self.x, self.y, ninterp)
x, y = np.meshgrid(xf, yf)
if len(coords) > 0:
xlist, ylist = coord2xy(coords, self.center) * 3600.
nprof = len(xlist)
prof = np.empty((nprof, len(self.v)))
ellipse = ellipse or [[0, 0, 0]] * nprof
calc = np.sum if flux else np.mean
for i, (xc, yc, e) in enumerate(zip(xlist, ylist, ellipse)):
major, minor, pa = e
z = dot2d(Mrot(-pa), [y - yc, x - xc])
if major == 0 or minor == 0:
r = np.hypot(*z)
idx = np.unravel_index(np.argmin(r), np.shape(r))
prof[i] = [d[idx] for d in data]
else:
r = np.hypot(*dot2d(Mfac(2 / major, 2 / minor), z))
prof[i] = [calc(d[r <= 1]) for d in data]
if flux:
if None in self.beam or None in [self.dx, self.dy]:
print('None in beam, dx, or dy. Flux is not converted.')
else:
Omega = np.pi * self.beam[0] * self.beam[1] / 4. / np.log(2.)
prof *= np.abs(self.dx * self.dy) / Omega
gfitres = self._gfit_profile(prof, gaussfit)
return self.v, prof, gfitres
[docs]
def rotate(self, pa: float = 0, **kwargs: Any) -> None:
"""Counter clockwise rotation with respect to the center.
Args:
pa (float, optional): Position angle in the unit of degree. Defaults to 0.
"""
yxnew = dot2d(Mrot(-pa), np.meshgrid(self.y, self.x, indexing='ij'))
self.data = RGIxy(self.y, self.x, self.data, yxnew, **kwargs)
if self.beam[2] is not None:
self.beam[2] = self.beam[2] + pa
[docs]
def slice(self, length: float = 0, pa: float = 0,
dx: float | None = None, **kwargs: Any) -> np.ndarray | None:
"""Get 1D slice with given a length and a position-angle.
Args:
length (float, optional): Slice line length. Defaults to 0.
pa (float, optional): Position angle in the unit of degree. Defaults to 0.
dx (float, optional): Grid increment. Defaults to None.
Returns:
np.ndarray: [x, data]. If self.data is 3D, the output data are in the shape of (len(v), len(x)).
"""
if dx is None and self.dx is not None:
dx = np.abs(self.dx)
if dx is None:
print('dx was not found. Please input dx.')
return
n = int(np.ceil(length / 2 / dx))
r = np.linspace(-n, n, 2 * n + 1) * dx
pa_rad = np.radians(pa)
yg, xg = r * np.cos(pa_rad), r * np.sin(pa_rad)
z = RGIxy(self.y, self.x, self.data, (yg, xg), **kwargs)
return np.array([r, z])
[docs]
def todict(self) -> dict:
"""Output the attributes as a dictionary that can be input to PlotAstroData.
Returns:
dict: Output that can be input to PlotAstroData.
"""
d = {'data': self.data, 'x': self.x, 'y': self.y, 'v': self.v,
'fitsimage': self.fitsimage, 'beam': self.beam, 'Tb': self.Tb,
'restfreq': self.restfreq, 'cfactor': self.cfactor,
'sigma': self.sigma, 'center': self.center, 'pv': self.pv,
'bunit': self.bunit}
return d
def _put_header(self, h: dict, t: str, crpix: int, crval: float,
cdelt: float | None = None) -> None:
fhd = self.fitsheader
axis = {'x': 1, 'y': 2, 'v': 2 if self.pv else 3}[t]
u = f'CUNIT{axis}'
indeg = fhd is None or u not in fhd or isdeg(fhd[u])
h[f'NAXIS{axis}'] = len(getattr(self, t))
h[f'CRPIX{axis}'] = int(crpix)
h[f'CRVAL{axis}'] = float(crval)
if cdelt is None:
cdelt = getattr(self, f'd{t}') / (3600 if indeg else 1)
h[f'CDELT{axis}'] = float(cdelt)
def _get_cvdv_in_freq(self, ck: int) -> tuple[float, float]:
cv = self.v[ck]
dv = self.dv
if self.restfreq is None or self.restfreq == 0:
s = 'No valid restfreq. The velocity axis is saved as is.'
warnings.warn(s, UserWarning)
return cv, dv
cv = (1 - cv / cu.c_kms) * self.restfreq
dv = -dv / cu.c_kms * self.restfreq
return cv, dv
[docs]
@_need_multipixels
def writetofits(self, fitsimage: str = 'out.fits',
header: dict = {}) -> None:
"""Write out the AstroData to a FITS file.
Args:
fitsimage (str, optional): Output FITS file name. Existing files with the same name are overwritten. Defaults to 'out.fits'.
header (dict, optional): Header dictionary. Defaults to {}.
"""
h = {}
ci = nearest_index(self.x)
cx = 0
if not self.pv:
cj = nearest_index(self.y)
if self.center is None:
cx, cy = self.x[ci], self.x[cj]
else:
xy = [self.x[ci] / 3600, self.y[cj] / 3600]
cx, cy = coord2xy(xy2coord(xy, self.center))
self._put_header(h, 'x', crpix=ci + 1, crval=cx)
if not self.pv:
self._put_header(h, 'y', crpix=cj + 1, crval=cy)
if self.dv is not None:
ck = nearest_index(self.v)
cv, dv = self._get_cvdv_in_freq(ck)
self._put_header(h, 'v', crpix=ck + 1, crval=cv, cdelt=dv)
if None not in self.beam:
beam = self.beam_org if self.pv else self.beam
h['BMAJ'] = float(beam[0] / 3600)
h['BMIN'] = float(beam[1] / 3600)
h['BPA'] = float(beam[2])
h.update(header)
data2fits(d=self.data, h=h, templatefits=self.fitsimage_org,
fitsimage=fitsimage)
def _as_list(value: Any, n: int, isbeam: bool = False) -> Any:
if isbeam:
return [value] * n if np.ndim(value) == 1 else value
else:
return value if isinstance(value, list) else [value] * n
def _scalar_if_single(value: Any, n: int) -> Any:
return value[0] if n == 1 else value
def _get_gridsep(axis: np.ndarray | None) -> float | None:
return axis[1] - axis[0] if axis is not None and len(axis) > 1 else None
ASTRODATA_ARGS = ['fitsimage', 'data', 'Tb', 'sigma', 'center', 'restfreq',
'cfactor', 'bunit', 'fitsimage_org', 'sigma_org',
'beam_org', 'fitsheader', 'pv', 'pvpa']
[docs]
@dataclass
class AstroFrame():
"""Parameter set to limit and reshape the data in the AstroData format.
Args:
vmin (float, optional): Velocity at the upper left. Defaults to -1e10.
vmax (float, optional): Velocity at the lower bottom. Defaults to 1e10.
vsys (float, optional): Each channel shows v-vsys. Defaults to 0..
center (str, optional): Central coordinate like '12h34m56.7s 12d34m56.7s'. Defaults to None.
fitsimage (str, optional): Fits to get center. Defaults to None.
rmax (float, optional): The x range is [-rmax, rmax]. The y range is [-rmax, rmax]. Defaults to 1e10.
xmax (float, optional): The x range is [xmin, xmax]. Defaults to None.
xmin (float, optional): The x range is [xmin, xmax]. Defaults to None.
ymax (float, optional): The y range is [ymin, ymax]. Defaults to None.
ymin (float, optional): The y range is [ymin, ymax]. Defaults to None.
dist (float, optional): Change x and y in arcsec to au. Defaults to 1..
xoff (float, optional): Map center relative to the center. Defaults to 0.
yoff (float, optional): Map center relative to the center. Defaults to 0.
xflip (bool, optional): True means left is positive x. Defaults to True.
yflip (bool, optional): True means bottom is positive y. Defaults to False.
swapxy (bool, optional): True means x and y are swapped. Defaults to False.
pv (bool, optional): Mode for PV diagram. Defaults to False.
quadrants (str, optional): '13' or '24'. Quadrants to take mean. None means not taking mean. Defaults to None.
"""
rmax: float = 1e10
xmax: float | None = None
xmin: float | None = None
ymax: float | None = None
ymin: float | None = None
dist: float = 1
center: str | None = None
fitsimage: str | None = None
xoff: float = 0
yoff: float = 0
vsys: float = 0
vmin: float = -1e20
vmax: float = 1e20
xflip: bool = True
yflip: bool = False
swapxy: bool = False
pv: bool = False
quadrants: str | None = None
def __post_init__(self) -> None:
self.xdir = -1 if self.xflip else 1
self.ydir = -1 if self.yflip else 1
self.xmin = -self.rmax if self.xmin is None else self.xmin
self.xmax = self.rmax if self.xmax is None else self.xmax
self.ymin = -self.rmax if self.ymin is None else self.ymin
self.ymax = self.rmax if self.ymax is None else self.ymax
if self.xflip:
self.xmin, self.xmax = self.xmax, self.xmin
if self.yflip:
self.ymin, self.ymax = self.ymax, self.ymin
xlim = [self.xoff + self.xmin, self.xoff + self.xmax]
ylim = [self.yoff + self.ymin, self.yoff + self.ymax]
vlim = [self.vmin, self.vmax]
if self.pv:
xlim = sorted(xlim)
if not self.xflip:
xlim.reverse()
self.xlim, self.ylim, self.vlim = xlim, ylim, vlim
_x, _y = (xlim, vlim) if self.pv else (xlim, ylim)
self.Xlim, self.Ylim = (_y, _x) if self.swapxy else (_x, _y)
if self.quadrants is not None:
self.Xlim = [0, self.rmax]
self.Ylim = [0, min(self.vmax, -self.vmin)]
if self.fitsimage is not None and self.center is None:
self.center = FitsData(self.fitsimage).get_center()
[docs]
def pos2xy(self, poslist: list[str | list[float, float]] = []
) -> np.ndarray:
"""Text or relative to absolute coordinates.
Args:
poslist (list, optional): Text coordinates or relative coordinates. Defaults to [].
Returns:
np.ndarray: absolute coordinates.
"""
onexy = np.shape(poslist) == (2,) and not isinstance(poslist[0], str)
if np.shape(poslist) == () or onexy:
poslist = [poslist]
x, y = [None] * len(poslist), [None] * len(poslist)
for i, p in enumerate(poslist):
if isinstance(p, str):
x[i], y[i] = coord2xy(p, self.center) * 3600.
else:
x[i], y[i] = rel2abs(*p, self.Xlim, self.Ylim)
return np.array([x, y])
def _get_restfreq(self, header: dict) -> float | None:
"""Extract rest frequency from FITS header."""
if 'RESTFRQ' in header:
return header['RESTFRQ']
if 'RESTFREQ' in header:
return header['RESTFREQ']
if 'NAXIS3' in header and header['NAXIS3'] == 1 and not self.pv:
return header['CRVAL3']
return None
def _read_fitsimage(self, d: AstroData, i: int, grid: list) -> list:
"""Read FITS-derived values into d and return the FITS grid."""
if d.fitsimage[i] is None:
return grid
fd = FitsData(d.fitsimage[i])
if d.fitsheader[i] is None:
d.fitsheader[i] = fd.get_header()
if d.center[i] is None and not self.pv:
d.center[i] = fd.get_center()
if d.restfreq[i] is None:
d.restfreq[i] = self._get_restfreq(d.fitsheader[i])
d.data[i] = fd.get_data()
grid = fd.get_grid(center=d.center[i], dist=self.dist,
restfreq=d.restfreq[i], vsys=self.vsys,
pv=self.pv)
if fd.wcsrot:
d.center[i] = fd.get_center()
d.beam[i] = fd.get_beam(dist=self.dist)
d.bunit[i] = fd.get_header('BUNIT')
return grid
def _shift_center(self, d: AstroData, i: int, grid: list) -> list:
corg = d.center[i]
cnew = self.center
if self.pv or cnew is None or corg is None or corg == cnew:
return grid
cx, cy = coord2xy(corg, cnew) * 3600
grid[0] = grid[0] + cx # Don't use += cx.
grid[1] = grid[1] + cy # Don't use += cy.
d.center[i] = cnew
return grid
def _ascending_v(self, d: AstroData, i: int,
v: np.ndarray | None) -> None:
if v is not None and len(v) > 1 and v[1] < v[0]:
d.data[i], v = d.data[i][::-1], v[::-1]
print('Velocity has been inverted.')
d.v = v
def _xyskip(self, d: AstroData, i: int,
x: np.ndarray | None, y: np.ndarray | None) -> None:
d.x = x[::self.xskip]
d.y = y[::self.yskip]
data = np.moveaxis(d.data[i], [-2, -1], [0, 1])
data = data[::self.yskip, ::self.xskip]
d.data[i] = np.moveaxis(data, [0, 1], [-2, -1])
def _trim_skip(self, d: AstroData, i: int, grid: list) -> None:
d.data[i], grid = trim(data=d.data[i],
x=grid[0], y=grid[1], v=grid[2],
xlim=self.xlim, ylim=self.ylim,
vlim=self.vlim, pv=self.pv)
self._ascending_v(d, i, v=grid[2])
grid = [grid[0], d.v] if self.pv else [grid[0], grid[1]]
if self.swapxy:
grid.reverse()
d.data[i] = np.moveaxis(d.data[i], 1, 0)
self._xyskip(d, i, x=grid[0], y=grid[1])
if self.pv:
d.v = d.y
for axis in ['x', 'y', 'v']:
setattr(d, f'd{axis}', _get_gridsep(getattr(d, axis)))
def _convert_to_Tb(self, d: AstroData, i: int) -> None:
"""Convert Jy/beam data to brightness temperature if requested."""
if not d.Tb[i]:
return
dx = d.dy if self.swapxy else d.dx
header = {'CDELT1': dx / 3600,
'CUNIT1': 'deg',
'RESTFREQ': d.restfreq[i]}
if None not in d.beam[i]:
header['BMAJ'] = d.beam[i][0] / 3600 / self.dist
header['BMIN'] = d.beam[i][1] / 3600 / self.dist
factor = Jy2K(header=header)
d.data[i] = d.data[i] * factor
if d.sigma[i] is not None:
d.sigma[i] = d.sigma[i] * factor
def _set_pv_beam(self, d: AstroData, i: int) -> None:
"""Set effective PV beam."""
if not self.pv or d.pv[i] or None in d.beam[i]:
return
bmaj, bmin, bpa = d.beam_org[i] = d.beam[i]
if d.pvpa[i] is None:
d.pvpa[i] = bpa
print('pvpa is not specified. pvpa=bpa is assumed.')
angle = np.radians(bpa - d.pvpa[i])
beam_incut = 1 / np.hypot(np.cos(angle) / bmaj, np.sin(angle) / bmin)
d.beam[i] = np.array([np.abs(d.dv), beam_incut, 0])
def _read_one(self, d: AstroData, i: int) -> None:
if d.center[i] == 'common':
d.center[i] = self.center
d.sigma_org[i] = d.sigma[i]
grid = self._read_fitsimage(d, i, grid=[d.x, d.y, d.v])
if d.data[i] is not None:
d.sigma[i] = estimate_rms(d.data[i], d.sigma[i])
grid = self._shift_center(d, i, grid)
self._trim_skip(d, i, grid)
if self.quadrants is not None:
d.data[i], d.x, d.y \
= quadrantmean(d.data[i], d.x, d.y, self.quadrants)
d.data[i] = d.data[i] * d.cfactor[i]
if d.sigma[i] is not None:
d.sigma[i] = d.sigma[i] * d.cfactor[i]
self._convert_to_Tb(d, i)
self._set_pv_beam(d, i)
d.pv[i] = self.pv
d.Tb[i] = False
d.cfactor[i] = 1
d.fitsimage_org[i] = d.fitsimage[i]
d.fitsimage[i] = None
[docs]
def read(self, d: AstroData, xskip: int = 1, yskip: int = 1) -> None:
"""Get data, grid, sigma, beam, and bunit from AstroData, which is a part of the input of add_color, add_contour, add_segment, and add_rgb.
This method modifies ``d`` in place. During the read, the input fields are normalized to per-dataset lists, FITS-derived values are filled, trimming and coordinate-frame changes are applied, and bookkeeping fields such as ``fitsimage``, ``fitsimage_org``, ``Tb``, ``cfactor``, and ``sigma`` are updated.
Args:
d (AstroData): Dataclass for the add_* input.
xskip, yskip (int): Spatial pixel skip. Defaults to 1.
"""
self.xskip = xskip
self.yskip = yskip
for name in ASTRODATA_ARGS:
setattr(d, name, _as_list(getattr(d, name), d.n))
d.beam = _as_list(d.beam, d.n, isbeam=True)
for i in range(d.n):
self._read_one(d, i)
for name in ASTRODATA_ARGS + ['beam']:
setattr(d, name, _scalar_if_single(getattr(d, name), d.n))