Source code for plotastrodata.fits_utils

import numpy as np
from astropy import units, wcs
from astropy.io import fits

from plotastrodata import const_utils as cu
from plotastrodata.coord_utils import coord2xy, xy2coord
from plotastrodata.matrix_utils import dot2d
from plotastrodata.noise_utils import estimate_rms
from plotastrodata.other_utils import isdeg, RGIxy, trim


[docs] def Jy2K(header=None, bmaj: float | None = None, bmin: float | None = None, restfreq: float | None = None) -> float: """Calculate a conversion factor in the unit of K/Jy. Args: header (optional): astropy.io.fits.open('a.fits')[0].header. Defaults to None. bmaj (float, optional): beam major axis in degree. Defaults to None. bmin (float, optional): beam minor axis in degree. Defaults to None. freq (float, optional): rest frequency in Hz. Defaults to None. Returns: float: the conversion factor in the unit of K/Jy. """ freq = None if header is not None: if 'BMAJ' in header and 'BMIN' in header: bmaj, bmin = header['BMAJ'] * 3600, header['BMIN'] * 3600 else: print('Use CDELT1^2 for Tb conversion.') todiameter = np.sqrt(4 * np.log(2) / np.pi) * 3600 bmaj = bmin = header['CDELT1'] * todiameter if header['CUNIT1'] == 'arcsec': bmaj, bmin = bmaj / 3600, bmin / 3600 if 'RESTFREQ' in header: freq = header['RESTFREQ'] if 'RESTFRQ' in header: freq = header['RESTFRQ'] if restfreq is not None: freq = restfreq if freq is None: print('Please input restfreq.') return omega = bmaj * bmin * units.arcsec**2 * np.pi / 4. / np.log(2.) equiv = units.brightness_temperature(freq * units.Hz, beam_area=omega) T = (1 * units.Jy / units.beam).to(units.K, equivalencies=equiv) return T.value
[docs] class FitsData: """For practical treatment of data in a FITS file. Args: fitsimage (str): Input FITS file name. """ def __init__(self, fitsimage: str): self.fitsimage = fitsimage
[docs] def gen_hdu(self) -> None: """Generate self.hdu, which is astropy,io.fits.open()[0]. """ hdu = fits.open(self.fitsimage) self.hdu = hdu[0] if 'BEAMS' in hdu: print('Beam table found in HDU list. Use median beam.') b = hdu['BEAMS'].data area = b['BMAJ'] * b['BMIN'] # arcsec^2? imed = np.nanargmin(np.abs(area - np.nanmedian(area))) self.hdubeam = b['BMAJ'][imed], b['BMIN'][imed], b['BPA'][imed]
[docs] def gen_header(self) -> None: """Generate self.header., which is astropy.io.fits.open()[0].header. """ if not hasattr(self, 'hdu'): self.gen_hdu() self.header = self.hdu.header
[docs] def get_header(self, key: str | None = None) -> dict | float: """Output the entire header or a value when a key is given. Args: key (str, optional): Key name of the FITS header. Defaults to None. Returns: dict or float: The entire header or a value. """ if not hasattr(self, 'header'): self.gen_header() if key is None: return self.header if key in self.header: return self.header[key] print(f'{key} is not in the header.') return None
[docs] def gen_beam(self, dist: float = 1.) -> None: """Generate sef.bmaj, self.bmin, self.bpa from header['BMAJ'], etc. Args: dist (float, optional): bmaj and bmin are multiplied by dist. Defaults to 1.. """ if hasattr(self, 'hdubeam'): bmaj, bmin, bpa = self.hdubeam bmaj = bmaj * dist bmin = bmin * dist else: bmaj = self.get_header('BMAJ') bmin = self.get_header('BMIN') bpa = self.get_header('BPA') if bmaj is not None: bmaj = bmaj * 3600 * dist if bmin is not None: bmin = bmin * 3600 * dist self.bmaj, self.bmin, self.bpa = bmaj, bmin, bpa
[docs] def get_beam(self, dist: float = 1.) -> np.ndarray: """Output the beam array of [bmaj, bmin, bpa]. Args: dist (float, optional): bmaj and bmin are multiplied by dist. Defaults to 1.. Returns: np.ndarray: [bmaj, bmin, bpa]. """ if not hasattr(self, 'bmaj'): self.gen_beam(dist) return np.array([self.bmaj, self.bmin, self.bpa])
[docs] def get_center(self) -> str: """Output the central coordinates as text. Returns: str: The central coordinates. """ cunit1 = self.get_header('CUNIT1') cunit2 = self.get_header('CUNIT2') if not (isdeg(cunit1) and isdeg(cunit2)): print(f'CUNIT1=\'{cunit1}\' and CUNIT2=\'{cunit2}\'. \'center\' is ignored.') return None ra_deg = self.get_header('CRVAL1') dec_deg = self.get_header('CRVAL2') radesys = self.get_header('RADESYS') a = xy2coord([ra_deg, dec_deg]) if radesys is not None: a = f'{radesys} {a}' return a
[docs] def gen_data(self, Tb: bool = False, log: bool = False, drop: bool = True, restfreq: float = None) -> None: """Generate data, which may be brightness temperature. Args: Tb (bool, optional): True means the data are brightness temperatures. Defaults to False. log (bool, optional): True means the data are after taking the logarithm to the base 10. Defaults to False. drop (bool, optional): True means the data are after using np.squeeze. Defaults to True. restfreq (float, optional): Rest frequency for calculating the brightness temperature. Defaults to None. """ self.data = None if not hasattr(self, 'hdu'): self.gen_hdu() h, d = self.hdu.header, self.hdu.data if drop: d = np.squeeze(d) if Tb: d *= Jy2K(header=h, restfreq=restfreq) if log: d = np.log10(d.clip(np.min(d[d > 0]), None)) self.data = d
[docs] def get_data(self, **kwargs) -> np.ndarray: """Output data. This method can take the arguments of gen_data(). Returns: np.ndarray: data in the format of np.ndarray. """ if not hasattr(self, 'data'): self.gen_data(**kwargs) return self.data
[docs] def gen_grid(self, center: str | None = None, dist: float = 1., restfreq: float | None = None, vsys: float = 0., pv: bool = False) -> None: """Generate grids relative to the center and vsys. Args: center (str, optional): Center for the spatial grids. Defaults to None. dist (float, optional): The spatial grids are multiplied by dist. Defaults to 1.. restfreq (float, optional): Rest frequency for converting the frequencies to velocities. Defaults to None. vsys (float, optional): The velocity is relative to vsys. Defaults to 0.. pv (bool, optional): Mode for position-velocity diagram. Defaults to False. """ h = self.get_header() # WCS rotation (Calabretta & Greisen 2002, Astronomy & Astrophysics, 395, 1077) self.wcsrot = False cdij = ['CD1_1', 'CD1_2', 'CD2_1', 'CD2_2'] if np.all([s in list(h.keys()) for s in cdij]): self.wcsrot = True cd11, cd12, cd21, cd22 = [h[s] for s in cdij] if cd21 == 0: rho_a = 0 else: rho_a = np.arctan2(np.abs(cd21), np.sign(cd21) * cd11) if cd12 == 0: rho_b = 0 else: rho_b = np.arctan2(np.abs(cd12), -np.sign(cd12) * cd22) if (drho := np.abs(np.degrees(rho_a - rho_b))) > 1.0: print('Angles from (CD21, CD11) and (CD12, CD22)' + f' are different by {drho:.2} degrees.') crota2 = (rho_a + rho_b) / 2. sin_rho = np.sin(crota2) cos_rho = np.cos(crota2) cdelt1 = cd11 * cos_rho + cd21 * sin_rho cdelt2 = -cd12 * sin_rho + cd22 * cos_rho crota2 = np.degrees(crota2) h['CDELT1'] = cdelt1 h['CDELT2'] = cdelt2 del h['CD1_1'] del h['CD1_2'] del h['CD2_1'] del h['CD2_2'] print(f'WCS rotation was found (CROTA2 = {crota2:f} deg).') # spatial center if center is None or self.wcsrot: cx, cy = 0, 0 else: c0 = xy2coord([h['CRVAL1'], h['CRVAL2']]) if 'RADESYS' in h: radesys = h['RADESYS'] c0 = f'{radesys} {c0}' cx, cy = coord2xy(center, c0) # rest frequency if restfreq is None: if 'RESTFRQ' in h: restfreq = h['RESTFRQ'] if 'RESTFREQ' in h: restfreq = h['RESTFREQ'] self.x, self.y, self.v = None, None, None self.dx, self.dy, self.dv = None, None, None def get_list(i: int, crval=False) -> np.ndarray: s = np.arange(h[f'NAXIS{i:d}']) s = (s - h[f'CRPIX{i:d}'] + 1) * h[f'CDELT{i:d}'] if crval: s = s + h[f'CRVAL{i:d}'] return s def gen_x(s_in: np.ndarray) -> None: s = (s_in - cx) * dist if isdeg(h['CUNIT1']): s *= 3600. self.x, self.dx = s, s[1] - s[0] def gen_y(s_in: np.ndarray) -> None: s = (s_in - cy) * dist if isdeg(h['CUNIT2']): s *= 3600. self.y, self.dy = s, s[1] - s[0] def gen_v(s_in: np.ndarray) -> None: if restfreq is None: freq = np.mean(s_in) print('restfreq is assumed to be the center.') else: freq = restfreq vaxis = '2' if pv else '3' key = f'CUNIT{vaxis}' cunitv = h[key] match cunitv.strip(): case 'Hz': if freq == 0: print('v is frequency because restfreq=0.') s = s_in * 1 else: s = (1 - s_in / freq) * cu.c_kms - vsys case 'HZ': if freq == 0: print('v is frequency because restfreq=0.') s = s_in * 1 else: s = (1 - s_in / freq) * cu.c_kms - vsys case 'm/s': print(f'{key}=\'m/s\' found.') s = s_in * 1e-3 - vsys case 'M/S': print(f'{key}=\'M/S\' found.') s = s_in * 1e-3 - vsys case 'km/s': print(f'{key}=\'km/s\' found.') s = s_in - vsys case 'KM/S': print(f'{key}=\'KM/S\' found.') s = s_in - vsys case _: print(f'Unknown CUNIT3 {cunitv} found.' + ' v is read as is.') s = s_in - vsys self.v, self.dv = s, s[1] - s[0] if h['NAXIS'] > 0 and h['NAXIS1'] > 1: gen_x(get_list(1)) if h['NAXIS'] > 1 and h['NAXIS2'] > 1: gen_v(get_list(2, True)) if pv else gen_y(get_list(2)) if h['NAXIS'] > 2 and h['NAXIS3'] > 1: gen_v(get_list(3, True)) if self.wcsrot: data = self.get_data() self.header['CRPIX1'] = ic = len(self.x) // 2 self.header['CRPIX2'] = jc = len(self.y) // 2 xc = self.x[ic] / self.dx yc = self.y[jc] / self.dy Mcd = [[cd11, cd12], [cd21, cd22]] xc, yc = dot2d(Mcd, [xc, yc]) newcenter = xy2coord(xy=[xc, yc], coordorg=self.get_center()) xc, yc = coord2xy(coords=newcenter, coordorg='00h00m00s 00d00m00s') self.header['CRVAL1'] = xc self.header['CRVAL2'] = yc self.x = self.x - self.x[ic] self.y = self.y - self.y[jc] x = self.x / (3600 if isdeg(h['CUNIT1']) else 1) y = self.y / (3600 if isdeg(h['CUNIT2']) else 1) X, Y = np.meshgrid(x, y) cdinv = np.linalg.inv([[cd11, cd12], [cd21, cd22]]) xnew, ynew = dot2d(cdinv, [X, Y]) datanew = RGIxy(self.y / self.dy, self.x / self.dx, data, (ynew, xnew)) self.data = datanew print('Data values were interpolated for WCS rotation.')
[docs] def get_grid(self, **kwargs) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Output the grids, [x, y, v]. This method can take the arguments of gen_grid(). Returns: tuple: (x, y, v). """ if not hasattr(self, 'x') or not hasattr(self, 'y'): self.gen_grid(**kwargs) return self.x, self.y, self.v
[docs] def trim(self, rmax: float = 1e10, xoff: float = 0., yoff: float = 0., vmin: float = -1e10, vmax: float = 1e10, pv: bool = False) -> None: """Trim the data and grids. The data range will be from xoff - rmax, yoff - rmax, vmin to xoff + rmax, yoff + rmax, vmax. Args: rmax (float, optional): Defaults to 1e10. xoff (float, optional): Defaults to 0.. yoff (float, optional): Defaults to 0.. vmin (float, optional): Defaults to -1e10. vmax (float, optional): Defaults to 1e10. pv (bool, optional): Mode for position-velocity diagram. Defaults to False. """ data = self.data if hasattr(self, 'data') else None x = self.x if hasattr(self, 'x') else None y = self.y if hasattr(self, 'y') else None v = self.v if hasattr(self, 'v') else None self.data, grid = trim(data=data, x=x, y=y, v=v, xlim=[xoff - rmax, xoff + rmax], ylim=[yoff - rmax, yoff + rmax], vlim=[vmin, vmax], pv=pv) self.x, self.y, self.v = grid
[docs] def fits2data(fitsimage: str, Tb: bool = False, log: bool = False, dist: float = 1., sigma: str | None = None, restfreq: float | None = None, center: str | None = None, vsys: float = 0., pv: bool = False, **kwargs ) -> tuple[np.ndarray, tuple[np.ndarray, np.ndarray, np.ndarray], tuple[float, float, float], float, float]: """Extract data from a fits file. kwargs are arguments of FitsData.trim(). Args: fitsimage (str): Input fits name. Tb (bool, optional): True means ouput data are brightness temperature. Defaults to False. log (bool, optional): True means output data are logarhismic. Defaults to False. dist (float, optional): Change x and y in arcsec to au. Defaults to 1.. sigma (str, optional): Noise level or method for measuring it. Defaults to None. restfreq (float, optional): Used for velocity and brightness temperature. Defaults to None. center (str, optional): Text coordinates. Defaults to None. vsys (float, optional): In the unit of km/s. Defaults to 0. pv (bool, optional): True means PV fits file. Defaults to False. Returns: tuple: (data, (x, y, v), (bmaj, bmin, bpa), bunit, rms) """ fd = FitsData(fitsimage) fd.gen_data(Tb=Tb, log=log, drop=True, restfreq=restfreq) rms = estimate_rms(fd.data, sigma) fd.gen_grid(center=center, dist=dist, restfreq=restfreq, vsys=vsys, pv=pv) fd.trim(pv=pv, **kwargs) beam = fd.get_beam(dist=dist) bunit = fd.get_header('BUNIT') return fd.data, (fd.x, fd.y, fd.v), beam, bunit, rms
[docs] def data2fits(d: np.ndarray, h: dict = {}, templatefits: str | None = None, fitsimage: str = 'test') -> None: """Make a fits file from a N-D array. Args: d (np.ndarray): N-D array. h (dict, optional): Additional FITS header. Defaults to {}. templatefits (str, optional): FITS file whose header is used as a temperate. Defaults to None. fitsimage (str, optional): Output filename, with or without '.fits'. Defaults to 'test'. """ _h = {} if templatefits is None else FitsData(templatefits).get_header() _h.update(h) naxis = np.ndim(d) w = wcs.WCS(naxis=naxis) if _h == {}: w.wcs.crpix = [0] * naxis w.wcs.crval = [0] * naxis w.wcs.cdelt = [1] * naxis defaults = {'CTYPE': ['RA---SIN', 'DEC--SIN', 'FREQ'], 'CUNIT': ['deg', 'deg', 'Hz']} for k, v in defaults.items(): for i in range(naxis): _h.setdefault(f'{k}{i+1:d}', v[i]) w.wcs.ctype = [_h[f'CTYPE{i+1}'] for i in range(naxis)] w.wcs.cunit = [_h[f'CUNIT{i+1}'] for i in range(naxis)] _h.setdefault('BUNIT', 'Jy/beam') if naxis >= 3: _h.setdefault('SPECSYS', 'LSRK') header = w.to_header() hdu = fits.PrimaryHDU(d, header=header) for k, v in _h.items(): if v is not None and 'COMMENT' not in k and 'HISTORY' not in k: hdu.header[k] = v hdu = fits.HDUList([hdu]) hdu.writeto(fitsimage.removesuffix('.fits') + '.fits', overwrite=True)