import numpy as np
import warnings
from dataclasses import dataclass
from scipy.interpolate import RegularGridInterpolator as RGI
from scipy.signal import convolve
from plotastrodata import const_utils as cu
from plotastrodata.coord_utils import coord2xy, rel2abs, xy2coord
from plotastrodata.fits_utils import data2fits, FitsData, Jy2K
from plotastrodata.fitting_utils import (EmceeCorner, gaussfit1d,
gaussfit2d, gaussian2d)
from plotastrodata.matrix_utils import dot2d, Mfac, Mrot
from plotastrodata.noise_utils import estimate_rms
from plotastrodata.other_utils import (isdeg, nearest_index,
RGIxy, RGIxyv, to4dim, trim)
[docs]
def quadrantmean(data: np.ndarray, x: np.ndarray, y: np.ndarray,
quadrants: str = '13'
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Take mean between 1st and 3rd (or 2nd and 4th) quadrants.
Args:
data (np.ndarray): 2D array.
x (np.ndarray): 1D array. First coordinate.
y (np.ndarray): 1D array. Second coordinate.
quadrants (str, optional): '13' or '24'. Defaults to '13'.
Returns:
tuple: Averaged (data, x, y).
"""
if np.ndim(data) != 2:
print('data must be 2D.')
return
dx, dy = x[1] - x[0], y[1] - y[0]
nx = int(np.floor(max(np.abs(x[0]), np.abs(x[-1])) / dx))
ny = int(np.floor(max(np.abs(y[0]), np.abs(y[-1])) / dy))
xnew = np.linspace(-nx * dx, nx * dx, 2 * nx + 1)
ynew = np.linspace(-ny * dy, ny * dy, 2 * ny + 1)
Xnew, Ynew = np.meshgrid(xnew, ynew)
if quadrants in ['13', '24']:
s = 1 if quadrants == '13' else -1
f = RGI((y, s * x), data, bounds_error=False, fill_value=np.nan)
datanew = f((Ynew, Xnew))
else:
print('quadrants must be \'13\' or \'24\'.')
datanew = (datanew + datanew[::-1, ::-1]) / 2.
return datanew[ny:, nx:], xnew[nx:], ynew[ny:]
[docs]
def filled2d(data: np.ndarray, x: np.ndarray, y: np.ndarray, n: int = 1,
**kwargs) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Fill 2D data, 1D x, and 1D y by a factor of n using RGI.
Args:
data (np.ndarray): 2D or 3D array.
x (np.ndarray): 1D array.
y (np.ndarray): 1D array.
n (int, optional): How many times more the new grid is. Defaults to 1.
Returns:
tuple: The interpolated (data, x, y).
"""
xnew = np.linspace(x[0], x[-1], n * (len(x) - 1) + 1)
ynew = np.linspace(y[0], y[-1], n * (len(y) - 1) + 1)
d = RGIxy(y, x, data, np.meshgrid(ynew, xnew, indexing='ij'),
**kwargs)
return d, xnew, ynew
def _need_multipixels(method):
def wrapper(self, *args, **kwargs):
singlepixel = self.dx is None or self.dy is None
if singlepixel:
print('No pixel size.')
return
return method(self, *args, **kwargs)
return wrapper
[docs]
@dataclass
class AstroData():
"""Data to be processed and parameters for processing the data.
Args:
data (np.ndarray, optional): 2D or 3D array. Defaults to None.
x (np.ndarray, optional): 1D array. Defaults to None.
y (np.ndarray, optional): 1D array. Defaults to None.
v (np.ndarray, optional): 1D array. Defaults to None.
beam (np.ndarray, optional): [bmaj, bmin, bpa]. Defaults ot [None, None, None].
fitsimage (str, optional): Input fits name. Defaults to None.
Tb (bool, optional): True means the data array is brightness temperature. Defaults to False.
sigma (float or str, optional): Noise level or method for measuring it. Defaults to 'hist'.
center (str, optional): Text coordinates. 'common' means initialized value. Defaults to 'common'.
restfreq (float, optional): Used for velocity and brightness temperature. Defaults to None.
cfactor (float, optional): The data array is multiplied by cfactor. Defaults to 1.
pvpa (float, optional): Position angle of the PV cut. Defaults to None.
pv (bool, optional): True means the data array is a position-velocity diagram. Defaults to False.
bunit (str, optional): The unit of the data array. Defaults to ''.
"""
data: np.ndarray | None = None
x: np.ndarray | None = None
y: np.ndarray | None = None
v: np.ndarray | None = None
beam: np.ndarray | tuple[None] = (None, None, None)
fitsimage: str | None = None
Tb: bool = False
sigma: str | float | None = 'hist'
center: str = 'common'
restfreq: float | None = None
cfactor: float = 1
pvpa: float | None = None
pv: bool = False
bunit: str = ''
def __post_init__(self):
n = 0
if self.fitsimage is not None:
if type(self.fitsimage) is not list:
n = 1
elif any(a is not None for a in self.fitsimage):
n = len(self.fitsimage)
else:
n = 0
if n > 0:
self.data = None
if self.data is not None:
if type(self.data) is not list:
n = 1
elif any(a is not None for a in self.data):
n = len(self.data)
else:
n = 0
if n == 0:
print('Either data or fitsimage must be given.')
self.n = n
self.fitsimage_org = None
self.sigma_org = None
self.beam_org = None
self.fitsheader = None
[docs]
def binning(self, width: list[int, int, int] = [1, 1, 1]):
"""Binning up neighboring pixels in the v, y, and x domain.
Args:
width (list, optional): Number of channels, y-pixels, and x-pixels for binning. Defaults to [1, 1, 1].
"""
w = [1] * (4 - len(width)) + list(width)
d = to4dim(self.data)
size = np.array(np.shape(d))
w = np.array(w, dtype=int)
if np.any(w > size):
w = np.minimum(w, size)
ws = ', '.join([f'{s:d}' for s in w[1:]])
print(f'width was changed to [{ws}].')
newsize = size // w
if w[1] > 1:
print(f'sigma has been divided by sqrt({w[1]:d})'
+ ' because of binning in the v-axis.')
self.sigma = self.sigma / np.sqrt(w[1])
if w[2] > 1 or w[3] > 1:
print('Binning in the x- or y-axis does not update sigma.')
grid = [None, self.v, self.y, self.x]
dgrid = [None, self.dv, self.dy, self.dx]
for n in range(1, 4):
if w[n] == 1 or grid[n] is None:
continue
if dgrid[n] is None:
t = ['v', 'y', 'x'][n - 1]
s = f'Skip binning in the {t}-axis' \
+ f' because d{t} is None.'
warnings.warn(s, UserWarning)
continue
size[n] = newsize[n]
olddata = np.moveaxis(d, n, 0)
newdata = np.moveaxis(np.zeros(size), n, 0)
t = np.zeros(newsize[n])
for i in range(w[n]):
i_stop = i + newsize[n] * w[n]
i_step = w[n]
t += grid[n][i:i_stop:i_step]
newdata += olddata[i:i_stop:i_step]
grid[n] = t / w[n]
dgrid[n] = dgrid[n] * w[n]
d = np.moveaxis(newdata, 0, n) / w[n]
self.data = np.squeeze(d)
_, self.v, self.y, self.x = grid
_, self.dv, self.dy, self.dx = dgrid
[docs]
def centering(self, includexy: bool = True,
includev: bool = False,
**kwargs):
"""Spatial regridding to set the center at (x,y,v)=(0,0,0).
Args:
includexy (bool, optional): Centering in the x and y directions at each channel. Defaults to True.
includev (bool, optional): Centering in the v direction at each position. Defaults to False.
"""
if includexy:
xnew = self.x - self.x[nearest_index(self.x)]
ynew = self.y - self.y[nearest_index(self.y)]
if includev:
vnew = self.v - self.v[nearest_index(self.v)]
if includexy and includev:
self.data = RGIxyv(self.v, self.y, self.x, self.data,
np.meshgrid(vnew, ynew, xnew, indexing='ij'),
**kwargs)
self.v, self.y, self.x = vnew, ynew, xnew
elif includexy:
self.data = RGIxy(self.y, self.x, self.data,
np.meshgrid(ynew, xnew, indexing='ij'),
**kwargs)
self.y, self.x = ynew, xnew
elif includev:
nx, ny, nv = len(self.x), len(self.y), len(self.v)
a = np.empty((ny, nx, nv))
for i in range(ny):
for j in range(nx):
f = RGI((self.v,), self.data[:, i, j], method='linear',
bounds_error=False, fill_value=np.nan)
a[i, j] = f(vnew)
self.data = np.moveaxis(a, -1, 0)
self.v = vnew
else:
print('No change because includexy=False and includev=False.')
[docs]
@_need_multipixels
def circularbeam(self):
"""Make the beam circular by convolving with 1D Gaussian
"""
if None in self.beam:
print('No beam.')
return
bmaj, bmin, bpa = self.beam
self.rotate(-bpa)
nx = len(self.x) if len(self.x) % 2 == 1 else len(self.x) - 1
ny = len(self.y) if len(self.y) % 2 == 1 else len(self.y) - 1
y = np.linspace(-(ny-1) / 2, (ny-1) / 2, ny) * np.abs(self.dy)
g1 = np.exp(-4*np.log(2) * y**2 / (bmaj**2 - bmin**2))
e = np.sqrt(1 - bmin**2 / bmaj**2)
g1 /= np.sqrt(np.pi / 4 / np.log(2) * bmin * e)
g = np.zeros((ny, nx))
g[:, (nx - 1) // 2] = g1
d = self.data.copy()
d[np.isnan(d)] = 0
d = to4dim(d)
d = [[convolve(c, g, mode='same') for c in cc] for cc in d]
self.data = np.squeeze(d)
self.rotate(bpa)
self.beam[1] = self.beam[0]
self.beam[2] = 0
[docs]
def deproject(self, pa: float = 0, incl: float = 0, **kwargs):
"""Exapnd by a factor of 1/cos(incl) in the direction of pa+90 deg.
Args:
pa (float, optional): Position angle in the unit of degree. Defaults to 0.
incl (float, optional): Inclination angle in the unit of degree. Defaults to 0.
"""
ci = np.cos(np.radians(incl))
A = np.linalg.multi_dot([Mrot(pa), Mfac(1, ci), Mrot(-pa)])
yxnew = dot2d(A, np.meshgrid(self.y, self.x, indexing='ij'))
self.data = RGIxy(self.y, self.x, self.data, yxnew, **kwargs)
if None not in self.beam:
bmaj, bmin, bpa = self.beam
a, b = np.linalg.multi_dot([Mfac(1/bmaj, 1/bmin), Mrot(pa-bpa),
Mfac(1, ci), Mrot(-pa)]).T
alpha = (np.dot(a, a) + np.dot(b, b)) / 2
beta = np.dot(a, b)
gamma = (np.dot(a, a) - np.dot(b, b)) / 2
bpa_new = np.arctan(beta / gamma) / 2 * np.degrees(1)
if beta * bpa_new >= 0:
bpa_new += 90
Det = np.sqrt(beta**2 + gamma**2)
bmaj_new = 1 / np.sqrt(alpha - Det)
bmin_new = 1 / np.sqrt(alpha + Det)
self.beam = np.array([bmaj_new, bmin_new, bpa_new])
[docs]
@_need_multipixels
def fit2d(self, model: object, bounds: np.ndarray,
progressbar: bool = False,
kwargs_fit: dict = {}, kwargs_plotcorner: dict = {},
chan: int | None = None):
"""Fit a given 2D model function to self.data.
Args:
model (function): The model function in the form of f(par, x, y).
bounds (np.ndarray): bounds for fitting_utils.EmceeCorner.
progressbar (bool, optional): progressbar for fitting_utils.EmceeCorner. Defaults to False.
kwargs_fit (dict, optional): Arguments for fitting_utils.EmceeCorner.fit.
kwargs_plotcorner (dict, optional): Arguments for fitting_utils.EmceeCorner.plotcorner.
chan (int, optional): The channel number where the 2D model is fitted. Defaults to None.
Returns:
dict: The parameter sets (popt, plow, pmid, and phigh), the best 2D model array (model), and the residual from the model (residual).
"""
d = self.data if chan is None else self.data[chan]
x, y = np.meshgrid(self.x, self.y)
if None not in self.beam:
Omega = np.pi * self.beam[0] * self.beam[1] / 4 / np.log(2)
pixelperbeam = Omega / np.abs(self.dx * self.dy)
else:
pixelperbeam = 1.
s = 'sigma is multiplied by sqrt(pixel-per-beam)' \
+ ' to consider the noise correlation in a beam.' \
+ ' This correction is relatively conservative.'
warnings.warn(s, UserWarning)
def logl(p):
rss = np.nansum((model(p, x, y) - d)**2)
return -0.5 * rss / self.sigma**2 / pixelperbeam
mcmc = EmceeCorner(bounds=bounds, logl=logl,
progressbar=progressbar)
kwargs_fit0 = {}
kwargs_fit0.update(kwargs_fit)
mcmc.fit(**kwargs_fit0)
kwargs_plotcorner0 = {'show': False, 'savefig': None}
kwargs_plotcorner0.update(kwargs_plotcorner)
kw_pl = kwargs_plotcorner0
if kw_pl['show'] or kw_pl['savefig'] is not None:
mcmc.plotcorner(**kw_pl)
popt = mcmc.popt
plow = mcmc.plow
pmid = mcmc.pmid
phigh = mcmc.phigh
modelopt = model(popt, x, y)
residual = d - modelopt
return {'popt': popt, 'plow': plow, 'pmid': pmid, 'phigh': phigh,
'model': modelopt, 'residual': residual}
[docs]
@_need_multipixels
def gaussfit2d(self, chan: int | None = None) -> dict:
"""Fit a 2D Gaussian function to self.data using fitting_utils.gaussfit2d().
Args:
chan (int): The channel number where the 2D Gaussian is fitted. Defaults to None.
Returns:
dict: The best parameter set (popt), the error set (perr), the best 2D Gaussian array (model), the residual from the model (residual), and the coordinates of the best-fit center (center).
"""
z = self.data if chan is None else self.data[chan]
Omega = np.pi * self.beam[0] * self.beam[1] / 4 / np.log(2)
pixelperbeam = Omega / np.abs(self.dx * self.dy)
s = 'sigma is multiplied by sqrt(pixel-per-beam)' \
+ ' to consider the noise correlation in a beam.' \
+ ' This correction is relatively conservative.'
warnings.warn(s, UserWarning)
res = gaussfit2d(xdata=self.x, ydata=self.y, zdata=z,
sigma=self.sigma * np.sqrt(pixelperbeam),
show=False, nwalkersperdim=4)
popt, perr = res['popt'], res['perr']
model = gaussian2d(np.meshgrid(self.x, self.y), *popt)
residual = z - model
if self.center is not None:
newcenter = xy2coord(popt[1:3] / 3600, coordorg=self.center)
else:
newcenter = None
return {'popt': popt, 'perr': perr,
'model': model, 'residual': residual,
'center': newcenter}
[docs]
def histogram(self, **kwargs) -> tuple:
"""Output histogram of self.data using numpy.histogram. This method can take the arguments of numpy.histogram.
Returns:
tuple: (bins, histogram)
"""
hist, hbin = np.histogram(self.data[~np.isnan(self.data)],
**kwargs)
hbin = (hbin[:-1] + hbin[1:]) / 2
return hbin, hist
[docs]
def mask(self, dataformask: np.ndarray | None = None,
includepix: list[float, float] = [],
excludepix: list[float, float] = []):
"""Mask self.data using a 2D or 3D array of dataformask.
Args:
dataformask (np.ndarray, optional): 2D or 3D array is used for specifying the mask.
includepix (list, optional): Data in this range survivies. Defaults to [].
excludepix (list, optional): Data in this range is masked. Defaults to [].
"""
if dataformask is None:
dataformask = self.data
if np.ndim(self.data) > np.ndim(dataformask):
print('The mask is broadcasted.')
mask = np.full(np.shape(self.data), dataformask)
else:
mask = dataformask
if np.shape(self.data) != np.shape(mask):
print('The dataformask has a different shape.')
return
if len(includepix) == 2:
self.data[(mask < includepix[0]) + (includepix[1] < mask)] = np.nan
if len(excludepix) == 2:
self.data[(excludepix[0] < mask) * (mask < excludepix[1])] = np.nan
[docs]
def profile(self, coords: list[str] = [],
xlist: list[float] = [], ylist: list[float] = [],
ellipse: list[float, float, float] | None = None,
ninterp: int = 1,
flux: bool = False, gaussfit: bool = False
) -> tuple[np.ndarray, np.ndarray, dict]:
"""Get a list of line profiles at given spatial coordinates.
Args:
coords (list, optional): Text coordinates. Defaults to [].
xlist (list, optional): Offset from center. Defaults to [].
ylist (list, optional): Offset from center. Defaults to [].
ellipse (list, optional): [major, minor, pa]. For average. Defaults to None.
ninterp (int, optional): Number of points for interpolation. Defaults to 1.
flux (bool, optional): Jy/beam to Jy. Defaults to False.
gaussfit (bool, optional): Fit the profiles. Defaults to False.
Returns:
tuple: (v, list of profiles, result of Gaussian fit)
"""
if np.ndim(self.data) != 3 or self.v is None:
print('Data must be 3D with the v, y, and x axes.')
return
v = self.v
data, xf, yf = filled2d(self.data, self.x, self.y, ninterp)
x, y = np.meshgrid(xf, yf)
if len(coords) > 0:
xlist, ylist = coord2xy(coords, self.center) * 3600.
nprof = len(xlist)
prof = np.empty((nprof, len(v)))
ellipse = ellipse or [[0, 0, 0]] * nprof
calc = np.sum if flux else np.mean
for i, (xc, yc, e) in enumerate(zip(xlist, ylist, ellipse)):
major, minor, pa = e
z = dot2d(Mrot(-pa), [y - yc, x - xc])
if major == 0 or minor == 0:
r = np.hypot(*z)
idx = np.unravel_index(np.argmin(r), np.shape(r))
prof[i] = [d[idx] for d in data]
else:
r = np.hypot(*dot2d(Mfac(2 / major, 2 / minor), z))
prof[i] = [calc(d[r <= 1]) for d in data]
if flux:
if None in self.beam or None in [self.dx, self.dy]:
print('None in beam, dx, or dy. Flux is not converted.')
else:
Omega = np.pi * self.beam[0] * self.beam[1] / 4. / np.log(2.)
prof *= np.abs(self.dx * self.dy) / Omega
gfitres = {}
if gaussfit:
res = [None] * nprof
for i in range(nprof):
res[i] = gaussfit1d(xdata=v, ydata=prof[i],
sigma=None, show=True,
nwalkersperdim=8)
gfitres['best'] = [a['popt'][:3] for a in res]
gfitres['error'] = [a['perr'][:3] for a in res]
return v, prof, gfitres
[docs]
def rotate(self, pa: float = 0, **kwargs):
"""Counter clockwise rotation with respect to the center.
Args:
pa (float, optional): Position angle in the unit of degree. Defaults to 0.
"""
yxnew = dot2d(Mrot(-pa), np.meshgrid(self.y, self.x, indexing='ij'))
self.data = RGIxy(self.y, self.x, self.data, yxnew, **kwargs)
if self.beam[2] is not None:
self.beam[2] = self.beam[2] + pa
[docs]
def slice(self, length: float = 0, pa: float = 0,
dx: float | None = None, **kwargs) -> np.ndarray:
"""Get 1D slice with given a length and a position-angle.
Args:
length (float, optional): Slice line length. Defaults to 0.
pa (float, optional): Position angle in the unit of degree. Defaults to 0.
dx (float, optional): Grid increment. Defaults to None.
Returns:
np.ndarray: [x, data]. If self.data is 3D, the output data are in the shape of (len(v), len(x)).
"""
if dx is None and self.dx is not None:
dx = np.abs(self.dx)
if dx is None:
print('dx was not found. Please input dx.')
return
n = int(np.ceil(length / 2 / dx))
r = np.linspace(-n, n, 2 * n + 1) * dx
pa_rad = np.radians(pa)
yg, xg = r * np.cos(pa_rad), r * np.sin(pa_rad)
z = RGIxy(self.y, self.x, self.data, (yg, xg), **kwargs)
return np.array([r, z])
[docs]
def todict(self) -> dict:
"""Output the attributes as a dictionary that can be input to PlotAstroData.
Returns:
dict: Output that can be input to PlotAstroData.
"""
d = {'data': self.data, 'x': self.x, 'y': self.y, 'v': self.v,
'fitsimage': self.fitsimage, 'beam': self.beam, 'Tb': self.Tb,
'restfreq': self.restfreq, 'cfactor': self.cfactor,
'sigma': self.sigma, 'center': self.center, 'pv': self.pv,
'bunit': self.bunit}
return d
[docs]
@_need_multipixels
def writetofits(self, fitsimage: str = 'out.fits',
header: dict = {}) -> None:
"""Write out the AstroData to a FITS file.
Args:
fitsimage (str, optional): Output FITS file name. Defaults to 'out.fits'.
header (dict, optional): Header dictionary. Defaults to {}.
"""
fhd = self.fitsheader
h = {}
ci = nearest_index(self.x)
cx = 0
if not self.pv:
cj = nearest_index(self.y)
if self.center is None:
cx, cy = self.x[ci], self.x[cj]
else:
xy = [self.x[ci] / 3600, self.y[cj] / 3600]
cx, cy = coord2xy(xy2coord(xy, self.center))
def indeg(s):
u = f'CUNIT{s}'
return fhd is None or u not in fhd or isdeg(fhd[u])
h['NAXIS1'] = len(self.x)
h['CRPIX1'] = int(ci + 1)
h['CRVAL1'] = float(cx)
h['CDELT1'] = float(self.dx / (3600 if indeg('1') else 1))
if self.dv is not None:
vaxis = '2' if self.pv else '3'
ck = nearest_index(self.v)
cv = self.v[ck]
dv = self.dv
if self.restfreq is None or self.restfreq == 0:
s = 'No valid restfreq.' \
+ f' Axis {vaxis} is saved as is.'
warnings.warn(s, UserWarning)
else:
cv = (1 - cv / cu.c_kms) * self.restfreq
dv = -dv / cu.c_kms * self.restfreq
h[f'NAXIS{vaxis}'] = len(self.v)
h[f'CRPIX{vaxis}'] = int(ck + 1)
h[f'CRVAL{vaxis}'] = float(cv)
h[f'CDELT{vaxis}'] = float(dv)
if not self.pv:
h['NAXIS2'] = len(self.y)
h['CRPIX2'] = int(cj + 1)
h['CRVAL2'] = float(cy)
h['CDELT2'] = float(self.dy / (3600 if indeg('2') else 1))
if None not in self.beam:
beam = self.beam_org if self.pv else self.beam
h['BMAJ'] = float(beam[0] / 3600)
h['BMIN'] = float(beam[1] / 3600)
h['BPA'] = float(beam[2])
h.update(header)
data2fits(d=self.data, h=h, templatefits=self.fitsimage_org,
fitsimage=fitsimage)
[docs]
@dataclass
class AstroFrame():
"""Parameter set to limit and reshape the data in the AstroData format.
Args:
vmin (float, optional): Velocity at the upper left. Defaults to -1e10.
vmax (float, optional): Velocity at the lower bottom. Defaults to 1e10.
vsys (float, optional): Each channel shows v-vsys. Defaults to 0..
center (str, optional): Central coordinate like '12h34m56.7s 12d34m56.7s'. Defaults to None.
fitsimage (str, optional): Fits to get center. Defaults to None.
rmax (float, optional): The x range is [-rmax, rmax]. The y range is [-rmax, ramx]. Defaults to 1e10.
xmax (float, optional): The x range is [xmin, xmax]. Defaults to None.
xmin (float, optional): The x range is [xmin, xmax]. Defaults to None.
ymax (float, optional): The y range is [ymin, ymax]. Defaults to None.
ymin (float, optional): The y range is [ymin, ymax]. Defaults to None.
dist (float, optional): Change x and y in arcsec to au. Defaults to 1..
xoff (float, optional): Map center relative to the center. Defaults to 0.
yoff (float, optional): Map center relative to the center. Defaults to 0.
xflip (bool, optional): True means left is positive x. Defaults to True.
yflip (bool, optional): True means bottom is positive y. Defaults to False.
swapxy (bool, optional): True means x and y are swapped. Defaults to False.
pv (bool, optional): Mode for PV diagram. Defaults to False.
quadrants (str, optional): '13' or '24'. Quadrants to take mean. None means not taking mean. Defaults to None.
"""
rmax: float = 1e10
xmax: float | None = None
xmin: float | None = None
ymax: float | None = None
ymin: float | None = None
dist: float = 1
center: str | None = None
fitsimage: str | None = None
xoff: float = 0
yoff: float = 0
vsys: float = 0
vmin: float = -1e20
vmax: float = 1e20
xflip: bool = True
yflip: bool = False
swapxy: bool = False
pv: bool = False
quadrants: str | None = None
def __post_init__(self):
self.xdir = -1 if self.xflip else 1
self.ydir = -1 if self.yflip else 1
self.xmin = -self.rmax if self.xmin is None else self.xmin
self.xmax = self.rmax if self.xmax is None else self.xmax
self.ymin = -self.rmax if self.ymin is None else self.ymin
self.ymax = self.rmax if self.ymax is None else self.ymax
if self.xflip:
self.xmin, self.xmax = self.xmax, self.xmin
if self.yflip:
self.ymin, self.ymax = self.ymax, self.ymin
xlim = [self.xoff + self.xmin, self.xoff + self.xmax]
ylim = [self.yoff + self.ymin, self.yoff + self.ymax]
vlim = [self.vmin, self.vmax]
if self.pv:
xlim = sorted(xlim)
if not self.xflip:
xlim.reverse()
self.xlim, self.ylim, self.vlim = xlim, ylim, vlim
_x, _y = (xlim, vlim) if self.pv else (xlim, ylim)
self.Xlim, self.Ylim = (_y, _x) if self.swapxy else (_x, _y)
if self.quadrants is not None:
self.Xlim = [0, self.rmax]
self.Ylim = [0, min(self.vmax, -self.vmin)]
if self.fitsimage is not None and self.center is None:
self.center = FitsData(self.fitsimage).get_center()
[docs]
def pos2xy(self, poslist: list[str | list[float, float]] = []
) -> np.ndarray:
"""Text or relative to absolute coordinates.
Args:
poslist (list, optional): Text coordinates or relative coordinates. Defaults to [].
Returns:
np.ndarray: absolute coordinates.
"""
onexy = np.shape(poslist) == (2,) and type(poslist[0]) is not str
if np.shape(poslist) == () or onexy:
poslist = [poslist]
x, y = [None] * len(poslist), [None] * len(poslist)
for i, p in enumerate(poslist):
if type(p) is str:
x[i], y[i] = coord2xy(p, self.center) * 3600.
else:
x[i], y[i] = rel2abs(*p, self.Xlim, self.Ylim)
return np.array([x, y])
[docs]
def read(self, d: AstroData, xskip: int = 1, yskip: int = 1):
"""Get data, grid, sigma, beam, and bunit from AstroData, which is a part of the input of add_color, add_contour, add_segment, and add_rgb.
Args:
d (AstroData): Dataclass for the add_* input.
xskip, yskip (int): Spatial pixel skip. Defaults to 1.
"""
if type(d.fitsimage) is not list:
d.fitsimage = [d.fitsimage] * d.n
if type(d.data) is not list:
d.data = [d.data] * d.n
if np.ndim(d.beam) == 1:
d.beam = [d.beam] * d.n
if type(d.Tb) is not list:
d.Tb = [d.Tb] * d.n
if type(d.sigma) is not list:
d.sigma = [d.sigma] * d.n
if type(d.center) is not list:
d.center = [d.center] * d.n
if type(d.restfreq) is not list:
d.restfreq = [d.restfreq] * d.n
if type(d.cfactor) is not list:
d.cfactor = [d.cfactor] * d.n
if type(d.bunit) is not list:
d.bunit = [d.bunit] * d.n
if type(d.fitsimage_org) is not list:
d.fitsimage_org = [d.fitsimage_org] * d.n
if type(d.sigma_org) is not list:
d.sigma_org = [d.sigma_org] * d.n
if type(d.beam_org) is not list:
d.beam_org = [d.beam_org] * d.n
if type(d.fitsheader) is not list:
d.fitsheader = [d.fitsheader] * d.n
if type(d.pv) is not list:
d.pv = [d.pv] * d.n
if type(d.pvpa) is not list:
d.pvpa = [d.pvpa] * d.n
grid0 = [d.x, d.y, d.v]
for i in range(d.n):
if d.center[i] == 'common':
d.center[i] = self.center
grid = grid0.copy()
if d.fitsimage[i] is not None:
fd = FitsData(d.fitsimage[i])
if d.fitsheader[i] is None:
d.fitsheader[i] = fd.get_header()
if d.center[i] is None and not self.pv:
d.center[i] = fd.get_center()
if d.restfreq[i] is None:
h = d.fitsheader[i]
if 'NAXIS3' in h and h['NAXIS3'] == 1 and not self.pv:
d.restfreq[i] = h['CRVAL3']
elif 'RESTFRQ' in h:
d.restfreq[i] = h['RESTFRQ']
elif 'RESTFREQ' in h:
d.restfreq[i] = h['RESTFREQ']
d.data[i] = fd.get_data()
grid = fd.get_grid(center=d.center[i], dist=self.dist,
restfreq=d.restfreq[i], vsys=self.vsys,
pv=self.pv)
if fd.wcsrot:
d.center[i] = fd.get_center() # for WCS rotation
d.beam[i] = fd.get_beam(dist=self.dist)
d.bunit[i] = fd.get_header('BUNIT')
if d.data[i] is not None:
d.sigma_org[i] = d.sigma[i]
d.sigma[i] = estimate_rms(d.data[i], d.sigma[i])
diffcent = (not self.pv
and self.center is not None
and d.center[i] is not None
and d.center[i] != self.center)
if diffcent:
cx, cy = coord2xy(d.center[i], self.center) * 3600
grid[0] = grid[0] + cx # Don't use += cx.
grid[1] = grid[1] + cy # Don't use += cy.
d.center[i] = self.center
d.data[i], grid = trim(data=d.data[i],
x=grid[0], y=grid[1], v=grid[2],
xlim=self.xlim, ylim=self.ylim,
vlim=self.vlim, pv=self.pv)
v = grid[2]
has_v = v is not None and len(v) > 1
if has_v and v[1] < v[0]:
d.data[i], v = d.data[i][::-1], v[::-1]
print('Velocity has been inverted.')
d.v = v
d.dv = v[1] - v[0] if has_v else None
grid = grid[:3:2] if self.pv else grid[:2]
if self.swapxy:
grid = [grid[1], grid[0]]
d.data[i] = np.moveaxis(d.data[i], 1, 0)
grid[0] = grid[0][::xskip]
grid[1] = grid[1][::yskip]
a = d.data[i]
a = np.moveaxis(a, [-2, -1], [0, 1])
a = a[::yskip, ::xskip]
a = np.moveaxis(a, [0, 1], [-2, -1])
d.data[i] = a
x, y = d.x, d.y = grid
has_x = x is not None and len(x) > 1
d.dx = x[1] - x[0] if has_x else None
has_y = y is not None and len(y) > 1
d.dy = y[1] - y[0] if has_y else None
if self.quadrants is not None:
d.data[i], d.x, d.y \
= quadrantmean(d.data[i], d.x, d.y, self.quadrants)
d.data[i] = d.data[i] * d.cfactor[i]
if d.sigma[i] is not None:
d.sigma[i] = d.sigma[i] * d.cfactor[i]
if d.Tb[i]:
dx = d.dy if self.swapxy else d.dx
header = {'CDELT1': dx / 3600,
'CUNIT1': 'DEG',
'RESTFREQ': d.restfreq[i]}
if None not in d.beam[i]:
header['BMAJ'] = d.beam[i][0] / 3600 / self.dist
header['BMIN'] = d.beam[i][1] / 3600 / self.dist
d.data[i] = d.data[i] * Jy2K(header=header)
d.sigma[i] = d.sigma[i] * Jy2K(header=header)
if self.pv and not d.pv[i] and None not in d.beam[i]:
bmaj, bmin, bpa = d.beam_org[i] = d.beam[i]
if d.pvpa[i] is None:
d.pvpa[i] = bpa
print('pvpa is not specified. pvpa=bpa is assumed.')
p = np.radians(bpa - d.pvpa[i])
b = 1 / np.hypot(np.cos(p) / bmaj, np.sin(p) / bmin)
d.beam[i] = np.array([np.abs(d.dv), b, 0])
d.pv[i] = self.pv
d.Tb[i] = False
d.cfactor[i] = 1
if d.fitsimage[i] is not None:
d.fitsimage_org[i] = d.fitsimage[i]
d.fitsimage[i] = None
if d.n == 1:
d.data = d.data[0]
d.beam = d.beam[0]
d.fitsimage = d.fitsimage[0]
d.Tb = d.Tb[0]
d.sigma = d.sigma[0]
d.center = d.center[0]
d.restfreq = d.restfreq[0]
d.cfactor = d.cfactor[0]
d.bunit = d.bunit[0]
d.fitsimage_org = d.fitsimage_org[0]
d.sigma_org = d.sigma_org[0]
d.beam_org = d.beam_org[0]
d.fitsheader = d.fitsheader[0]
d.pv = d.pv[0]
d.pvpa = d.pvpa[0]