Source code for interferopy.cube

import os
from tqdm import tqdm
import numpy as np
from astropy.io import fits
from astropy.table import Table
from astropy import wcs
import scipy.constants as const
import interferopy
import interferopy.tools as tools


[docs]class Cube: """ Cube object represents interferometric data such as a map (2D) or a cube (3D). Allows performing tasks such as aperture integration or spectrum extraction. It is instantiated by reading in a fits file. :Example: .. code-block:: python filename="cube.image.fits" ra, dec, freq = (205.533741, 9.477317341, 222.54) # coord for the emission line c=Cube(filename) # spectrum from the circular aperture, with associated error flux, err, _ = c.spectrum(ra=ra, dec=dec, radius=1.5, calc_error=True) # curve of growth up to the maximum radius, in steps of one pixel, in a chosen frequency channel radius, flux, err, _ = c.growing_aperture(ra=ra, dec=dec, freq=freq, maxradius=5, calc_error=True) """ def __init__(self, filename: str): """ :param filename: Path string to the fits image. """ if os.path.exists(filename): self.filename = filename self.hdu = None """Hdu of the loaded fits file.""" self.head = None """Image header of the loaded fits file.""" self.im: np.ndarray = None """Data cube indexed as im[ra, dec, freq]. This is transposed version of what the fits package returns""" self.wcs: wcs.WCS = None """Datacube world coordinate system.""" self.beam = None """Table of beams per channel, keywords: bmaj, bmin, bpa.""" self.beamvol: np.ndarray = None """Beam volume in pixels (number of pixels in a beam).""" self.pixsize: float = None """Size of the pixel in arcsec.""" self.nch: int = None """Number of channels in a cube.""" self.freqs: np.ndarray = None """Array of frequencies corresponding to cube channels.""" self.reffreq: float = None """Reference frequency in GHz (from image header).""" self.deltafreq: float = None """Channel width in GHz.""" self.__rms: np.ndarray = None """Array of rms values per channel. Computed on demand.""" self.__radesys: str = None """Reference frame. Read from header (with appropriate warnings) only when needed.""" self.__load_fitsfile() else: raise FileNotFoundError(filename) def __load_fitsfile(self): """ Read the fits file and extract common useful data into Cube class attributes. """ self.log("Open " + self.filename) self.hdu = fits.open(self.filename) self.head = self.hdu[0].header self.pixsize = self.head["cdelt2"] * 3600 # arcsec, assumes square pixels # quick fix for a faulty header card that contained \n char - remove the tag altogether if "ORIGIN" in self.head.keys(): self.head.remove('ORIGIN') # transpose the cube data so that intuitive im[ra,dec,freq,stokes] indexing can be used # number of cube dimensions naxis = len(self.hdu[0].data.shape) if naxis == 4 and self.hdu[0].data.shape[0] == 1: # drop the fourth (Stokes) axis if only a single plane is present image = self.hdu[0].data.T[:, :, :, 0] naxis = 3 elif naxis == 4: image = self.hdu[0].data.T self.log("Warning: did test 4D full polarization cubes with multiple Stokes axes.") elif naxis == 3: image = self.hdu[0].data.T elif naxis == 2: # add the third axis, if missing, because several methods require it image = self.hdu[0].data[np.newaxis, :, :].T # add also in saved header in Cube self.head['NAXIS3'] = 1 self.head['NAXIS'] = 3 self.head['CTYPE3'] = 'FREQ' self.head['CRVAL3'] = self.head['RESTFREQ'] self.head['CDELT3'] = 1 self.head['CRPIX3'] = 0 naxis = 3 self.log("Warning: 2D maps may have incorrect pixsize/beamvol. Please replace!") else: raise RuntimeError("Invalid number of cube dimensions.") self.im = image self.naxis = naxis if not np.isnan(self.im[(0,) * self.naxis]): self.log("Warning: The first voxel of the cube is not np.nan. " "Note regions with no data are assumed to have value np.nan. " "If needed, use self.im_mask_values to mask appropriately.") # save the world coord system self.wcs = wcs.WCS(self.head, naxis=self.naxis) # populate frequency details (freq array, channel size, number of channels) # convert from velocity header if necessary, scale to GHz nch = 1 if naxis >= 3: nch = self.im.shape[2] # number of (freq) channels in the cube # if frequencies are already 3rd axis if str(self.head["CTYPE3"]).strip().lower() == "freq": _, _, freqs = self.wcs.all_pix2world(int(self.im.shape[0] / 2), int(self.im.shape[1] / 2), range(nch), 0) freqs *= 1e-9 # in GHz self.deltafreq = self.head["CDELT3"] * 1e-9 self.reffreq = self.head["CRVAL3"] * 1e-9 # if frequencies are given in radio velocity, convert to freqs elif str(self.head["CTYPE3"]).strip().lower() == "vrad": _, _, vels = self.wcs.all_pix2world(int(self.im.shape[0] / 2), int(self.im.shape[1] / 2), range(nch), 0) if "RESTFREQ" in self.head.keys(): reffreq = self.head["RESTFREQ"] * 1e-9 elif "RESTFRQ" in self.head.keys(): reffreq = self.head["RESTFRQ"] * 1e-9 else: # unknown reffreq = 0 freqs = reffreq * (1 - vels / const.c) # in GHz self.reffreq = reffreq if "CUNIT3" in self.head.keys() and self.head["CUNIT3"].strip().lower() == "km/s": vel_scale = 1000 else: vel_scale = 1 self.deltafreq = reffreq * (self.head["CDELT3"] * vel_scale / const.c) elif str(self.head["CTYPE3"]).strip() == "WAVE": _, _, wave_lambda = self.wcs.all_pix2world(int(self.im.shape[0] / 2), int(self.im.shape[1] / 2), range(nch), 0) if str(self.head["CUNIT3"]).strip().lower() == "m": freqs = 3e5 / wave_lambda * 1e-9 # in GHz self.deltafreq = 3e5 / self.head["CDELT3"] * 1e-9 self.reffreq = 3e5 / self.head["CRVAL3"] * 1e-9 else: freqs = None self.log("Warning: unknown 3rd axis units.") # if frequencies are given in radio velocity, convert to freqs else: print(str(self.head["CTYPE3"])) freqs = None self.log("Warning: unknown 3rd axis format.") self.freqs = freqs self.nch = nch # populate beam data # single beam from the main header if "BMAJ" in self.head.keys() and "BMIN" in self.head.keys() and "BPA" in self.head.keys(): beam = {"bmaj": self.head["BMAJ"] * 3600, "bmin": self.head["BMIN"] * 3600, "bpa": self.head["BPA"]} else: beam = {"bmaj": 0, "bmin": 0, "bpa": 0} # if there is a beam per channel table present, take it if nch > 1 and len(self.hdu) > 1: beam_table = self.hdu[1].data # this is inserted by CASA, there is unit metadata, but should be arcsec # otherwise clone the single beam across all channels in a new table else: if len(self.im.shape) >= 3: nch = self.im.shape[2] else: nch = 1 beam_table = Table() beam_table.add_columns([Table.Column(name='bmaj', data=np.ones(nch) * beam["bmaj"])]) beam_table.add_columns([Table.Column(name='bmin', data=np.ones(nch) * beam["bmin"])]) beam_table.add_columns([Table.Column(name='bpa', data=np.ones(nch) * beam["bpa"])]) beam_table.add_columns([Table.Column(name='chan', data=range(nch))]) beam_table.add_columns([Table.Column(name='pol', data=np.zeros(nch))]) self.beam = beam_table # model image is Jy/pixel, and since beam volume divides integrated fluxes, set it to 1 in this case if "BUNIT" in self.head.keys() and self.head["BUNIT"].strip().lower().endswith("/pixel"): self.beamvol = np.array([1] * nch) else: # "BUNIT" in self.head.keys() and self.head["BUNIT"].strip().lower().endswith("/beam"): # sometimes BUNIT is not given, although beam is in the header (e.g. the PSF map) self.beamvol = np.pi / (4 * np.log(2)) * self.beam["bmaj"] * self.beam["bmin"] / self.pixsize ** 2 if type(self.beamvol) is Table.Column: self.beamvol = self.beamvol.data
[docs] def write_fitsfile(self, filename: str, overwrite=False): """ Write the current head and im to a new fitsfile (useful if modified). This is still experimental and may fail on certain headers. :param filename: Path string to the output file. :param overwrite: False by default. """ # transpose the cube back hdu = fits.PrimaryHDU(data=self.im.T, header=self.head) hdu.writeto(filename, overwrite=overwrite)
[docs] def log(self, text: str): """ Basic logger function to allow better functionality in the future development. All class functions print info through this wrapper. Could be extended to provide different levels of info, timestamps, or logging to a file. """ print(text)
[docs] def im_mask_values(self, value_to_mask=None, mask_value=np.nan): """Mask specific `values_to_mask` in the image to `mask_value`. For example, if regions with no data have values equal to zero or another fill value they can be set to np.nan. This is required for `get_rms()` to work correctly. :param value_to_mask: value to set to `mask_value`, if `None` the value of the first pixel is used ` :param mask_value: the value of pixels to be masked (default: np.nan) """ if value_to_mask is None: # if not given, it will assumes the first value is the value # to be masked, but only if it is the same as the last value out = self.im[(0,)*self.naxis] out2 = self.im[(-1,)*self.naxis] assert out == out2, "Cannot safely determine value_to_mask and it was not provided." else: out = value_to_mask if out != mask_value: self.im[self.im == out] = mask_value else: print ("Warning: You are masking to the same value.")
[docs] def im_mask_channels(self, channels_to_mask, mask_value=np.nan): """Mask specific `channels_to_mask` to `mask_value`. :param channels_to_mask: list of channels to be masked (python indexing) :param mask_value: the value of pixels to be masked (default: np.nan) """ for chan in channels_to_mask: self.im[:,:,chan] = mask_value
[docs] def get_rms(self): """ Calculate rms for each channel of the cube. Can take some time to compute on large cubes. :return: single rms value if 2D, array odf rms values for each channel if 3D cube """ # TODO: maybe add the option to calculate a single channel only? if self.__rms is None: # calc rms per channel if self.nch > 1: self.__rms = np.zeros(self.nch) self.log("Computing rms in each channel.") for i in range(self.nch): self.__rms[i] = tools.calcrms(self.im[:, :, i]) else: self.log("Computing rms.") self.__rms = np.array([tools.calcrms(self.im)]) return self.__rms
[docs] def set_rms(self, value): self.__rms = value
rms = property(get_rms, set_rms) """Rms (root mean square) per channel."""
[docs] def get_radesys(self): """Read celestial reference frame from header. If not found, it will assume ICRS and raise a warning. N.B. This is rarely used. Findclumps needs it to create ds9 region files.""" try: self.__radesys = self.head['RADESYS'].lower() except KeyError: self.log("Warning: RADEYS keyword not found in header; assuming ICRS when needed.") self.__radesys = "icrs" return self.__radesys
[docs] def set_radesys(self, value): self.__radesys = value
radesys = property(get_radesys, set_radesys) """Reference frame."""
[docs] def deltavel(self, reffreq: float = None): """ Compute channel width in velocity units (km/s). :param reffreq: Computed around specific velocity. If empty, use referent one from the header. :return: Channel width in km/s. Sign reflects how channels are ordered. """ if reffreq is None: reffreq = self.reffreq return self.deltafreq / reffreq * const.c / 1000 # in km/s
[docs] def vels(self, reffreq: float): """ Compute velocities of all cube channels for a given reference frequency. :param reffreq: Reference frequency in GHz. If empty, use referent one from the header. :return: Velocities in km/s. """ if reffreq is None: reffreq = self.reffreq return const.c / 1000 * (1 - self.freqs / reffreq)
[docs] def radec2pix(self, ra=None, dec=None, integer=True): """ Convert ra and dec coordinates into pixels to be used as im[px, py]. If no coords are given, the center of the map is assumed. :param ra: Right ascention in degrees. Or list. :param dec: Declination in degrees. Or list. :param integer: Round the coordinate to an integer value (to be used as indices). :return: Coords x and y in pixels (0 based index). """ # use the central pixel if no coords given if ra is None or dec is None: px = self.im.shape[0] / 2 py = self.im.shape[1] / 2 # otherwise convert radec to pixel coord else: if len(self.wcs.axis_type_names) < 3: px, py = self.wcs.all_world2pix(ra, dec) else: px, py, _ = self.wcs.all_world2pix(ra, dec, self.freqs[0], 0) # need integer indices if integer: px = np.asarray(np.round(px), dtype=int) py = np.asarray(np.round(py), dtype=int) return px, py
[docs] def pix2radec(self, px=None, py=None): """ Convert pixels coordinates into ra and dec. If no coords are given, the center of the map is assumed. :param px: X-axis index. Or list. :param py: Y-axis index. Or list. :return: Coords ra, dec in degrees """ if px is None or py is None: px = self.im.shape[0] / 2 py = self.im.shape[1] / 2 if len(self.wcs.axis_type_names) < 3: ra, dec = self.wcs.all_pix2world(px, py) else: ra, dec, _ = self.wcs.all_pix2world(px, py, self.freqs[0], 0) return ra, dec
[docs] def freq2pix(self, freq: float = None): """ Get the channel number of requested frequency. :param freq: Frequency in GHz. :return: Channel index. """ if len(self.wcs.axis_type_names) < 3: raise ValueError("No frequency axis is present.") return None if freq is None: pz = self.im.shape[2] / 2 pz = int(round(pz)) return pz if freq < np.min(self.freqs) - 0.5 * np.abs(self.deltafreq) \ or freq > np.max(self.freqs) + 0.5 * np.abs(self.deltafreq): raise ValueError("Requested frequency is outside of the available range.") return None # This is ok, unless the 3rd axis is in velocities # ra0 = self.head["CRVAL1"] # dec0 = self.head["CRVAL2"] # _, _, pz = self.wc.all_world2pix(ra0, dec0, freq * 1e9, 0) # pz = int(np.round(pz)) # need integer index # freqs is populated when the cube is loaded, regardless whether the header is in vels or freqs # find the index with the minimum offset pz = np.argmin(np.abs(self.freqs - freq)) return pz
[docs] def pix2freq(self, pz: int = None): """ Get the frequency of a given channel. If no channel is given, the center channel is assumed. :param pz: Channel index. :return: Frequency in GHz. """ if len(self.wcs.axis_type_names) < 3: raise ValueError("No frequency axis is present.") return None if pz is None: pz = self.im.shape[2] / 2 pz = int(round(pz)) if pz < 0 or pz >= len(self.freqs): raise ValueError("Requested channel is outside of the available range.") return None return self.freqs[pz]
[docs] def im2d(self, ch: int = None, freq: float = None, function=np.sum): """Get a 2D map. Convenience function to avoid indexing notation. Provided with a single `ch` (or `freq`, but not both) it will return that channel. Alternatively, `ch` or `freq` can also be a list with the `[start, stop]` of a slice, to which `function` is then applied to make a collapsed image (default: `np.sum`). :param ch: channel index, alternatively: a list with a slice `[start, stop]`` to which `function` is applied :param freq: channel freq, alternatively: a list with a slice `[start, stop]`` to which `function` is applied :param function: function to apply when `ch` or `freq` is a slice (default: `np.sum`), should be a (numpy) function that takes the argument `axis=-1` and aggregates over it :return: 2D numpy array. """ if ch is not None and freq is not None: raise ValueError("Provide either channel or frequency (not both).") return None try: # multiple frequencies? convert to channels ch_tmp = [] for i, f in enumerate(freq): ch_tmp.append(self.freq2pix(f)) ch = ch_tmp except TypeError: # still try multiple channels pass try: # multiple channels? for c in ch: if abs(c) >= self.nch: # allow negative channel indexing raise ValueError("Requested channel is outside of the available range.") return None return function(self.im[:, :, ch[0]:ch[1]], axis=-1) except TypeError: # single channel or frequency if freq is not None: ch = self.freq2pix(freq) elif abs(ch) >= self.nch: # allow negative channel indexing raise ValueError("Requested channel is outside of the available range.") return None return self.im[:, :, ch]
[docs] def distance_grid(self, px: float, py: float) -> np.ndarray: """ Grid of distances from the chosen pixel (can be subpixel accuracy). Uses small angle approximation (simple Pythagorean distances). Distances are measured in numbers of pixels. :param px: Index of x coord. :param py: Index of y coord. :return: 2D grid of distances in pixels, same shape as the cube slice """ xxx = np.arange(self.im.shape[0]) yyy = np.arange(self.im.shape[1]) distances = np.sqrt((yyy[np.newaxis, :] - py) ** 2 + (xxx[:, np.newaxis] - px) ** 2) return distances
[docs] def spectrum(self, ra: float = None, dec: float = None, radius=0.0, px: int = None, py: int = None, channel: int = None, freq: float = None, calc_error=False) \ -> (np.ndarray, np.ndarray, np.ndarray, np.ndarray): """ Extract the spectrum (for 3D cube) or a single flux density value (for 2D map) at a given coord (ra, dec) integrated within a circular aperture of a given radius. Coordinates can be given in degrees (ra, dec) or pixels (px, py). If no coordinates are given, the center of the map is assumed. Single plane can be chosen instead of full spectrum by defining freq or channel. If no radius is given, a single pixel value is extracted (usual units Jy/beam), otherwise aperture integrated spectrum is extracted (usual units of Jy). Note: use the freqs field (or velocities method) to get the x-axis values. :param ra: Right ascention in degrees. :param dec: Declination in degrees. :param radius: Circular aperture radius in arcsec. :param px: Right ascention pixel coord. :param py: Declination pixel coord. :param channel: Force extracton in a single channel of provided index (instead of the full cube). :param freq: Frequency in GHz (alternative to channel). :param calc_error: Set to False to skip error calculations, if the rms computation is slow or not necessary. :return: flux, err, npix: flux (spectrum), error estimate, and number of pixels in the aperture """ if px is None or py is None: px, py = self.radec2pix(ra, dec) if freq is not None: channel = self.freq2pix(freq) # take single pixel value if no aperture radius given if radius <= 0: self.log("Extracting single pixel spectrum.") flux = self.im[px, py, :] if calc_error: err = np.array(self.rms[:]) # single pixel error is just rms else: err = np.full_like(flux, np.nan) npix = np.ones(len(flux)) # use just a single channel if channel is not None: flux = np.array([flux[channel]]) npix = np.array([npix[channel]]) err = np.array([err[channel]]) peak_sb = flux else: self.log("Extracting aperture spectrum.") # grid of distances from the source in arcsec, need for the aperture mask distances = self.distance_grid(px, py) * self.pixsize # select pixels within the aperture w = distances <= radius if channel is not None: npix = np.array([np.sum(np.isfinite(self.im[:, :, channel][w]))]) flux = np.array([np.nansum(self.im[:, :, channel][w]) / self.beamvol[channel]]) peak_sb = np.nanmax(self.im[:, :, channel][w]) if calc_error: err = np.array(self.rms[channel] * np.sqrt(npix / self.beamvol[channel])) else: err = np.array([np.nan]) else: flux = np.zeros(self.nch) peak_sb = np.zeros(self.nch) npix = np.zeros(self.nch) for i in range(self.nch): flux[i] = np.nansum(self.im[:, :, i][w]) / self.beamvol[i] peak_sb[i] = np.nanmax(self.im[:, :, i][w]) npix[i] = np.sum(np.isfinite(self.im[:, :, i][w])) if calc_error: err = np.array(self.rms * np.sqrt(npix / self.beamvol)) else: err = np.full_like(flux, np.nan) return flux, err, npix, peak_sb
[docs] def single_pixel_value(self, ra: float = None, dec: float = None, freq: float = None, channel: int = None, calc_error: bool = False): """ Get a single pixel value at the given coord. Optionally, return associated error. Wrapper function for the "spectrum" method. If None, assumes central pixel. :param ra: Right ascention in degrees. :param dec: Declination in degrees. :param freq: Frequency in GHz. If None, computes whole spectrum. :param channel: Channel index (alternative to freq). :param calc_error: If true, returns also an error estimate (rms). :return: value or (value, error) """ spec, err, _, _ = self.spectrum(ra=ra, dec=dec, radius=0, freq=freq, channel=channel, calc_error=calc_error) if calc_error: return spec, err else: return spec
[docs] def aperture_value(self, ra: float = None, dec: float = None, freq: float = None, channel: int = None, radius: float = 1.0, calc_error=False): """ Get an aperture integrated value at the given coord. Optionally, return associated error. Wrapper function for the "spectrum" method. :param ra: Right ascention in degrees. If None, assumes central pixel. :param dec: Declination in degrees. :param freq: Frequency in GHz. If None, computes whole spectrum. :param channel: Channel index (alternative to freq). :param radius: Aperture radius in arcsec. :param calc_error: If true, returns also an error estimate (rms x sqrt(num of beams in aperture)). :return: value or (value, error) """ flux, err, _, _ = self.spectrum(ra=ra, dec=dec, radius=radius, freq=freq, channel=channel, calc_error=calc_error) if calc_error: return flux, err else: return flux
[docs] def growing_aperture(self, ra: float = None, dec: float = None, freq: float = None, maxradius=1.0, binspacing: float = None, bins: list = None, px: int = None, py: int = None, channel: int = 0, profile=False, calc_error=False) \ -> (np.ndarray, np.ndarray, np.ndarray, np.ndarray): """ Compute curve of growth at the given coordinate position in a circular aperture, growing up to the max radius. Coordinates can be given in degrees (ra, dec) or pixels (px, py). If no coordinates are given, the center of the map is assumed. If no channel is provided, the first one is assumed. :param ra: Right ascention in degrees. :param dec: Declination in degrees. :param freq: Frequency in GHz, takes precedence over channel param. :param maxradius: Max radius for aperture integration in arcsec. :param binspacing: Resolution of the growth flux curve in arcsec, default is one pixel size. :param bins: Custom bins for curve growth (1D np array). :param px: Right ascention pixel coord (alternative to ra). :param py: Declination pixel coord (alternative to dec). :param channel: Index of the cube channel to take (alternative to freq). :param profile: If True, compute azimuthally averaged profile, if False, compute cumulative aperture values :param calc_error: Set to False to skip error calculations, if the rms computation is slow or not necessary. :return: radius, flux, err, npix - all 1D numpy arrays: aperture radius, cumulative flux within it, associated Poissionain error (based on number of beams inside the aprture and the map rms), number of pixels """ self.log("Running growth_curve.") # get coordinates in pixels if px is None or py is None: px, py = self.radec2pix(ra, dec) # get grid of distances from the coordinate distances = self.distance_grid(px, py) * self.pixsize if freq is not None: channel = self.freq2pix(freq) if bins is None: if binspacing is None: # take one pixel size as default size binspacing = self.pixsize bins = np.arange(0, maxradius, binspacing) # histogram will fail if nans are present, select only valid pixels w = np.isfinite(self.im[:, :, channel]) # histogram by default sums the number of pixels npix = np.array(np.histogram(distances[w], bins=bins)[0]) if profile: pass else: npix = np.array(np.cumsum(npix)) # pixel values are added as histogram weights to get the sum of pixel values flux = np.histogram(distances[w], bins=bins, weights=self.im[:, :, channel][w])[0] if profile: # mean value - azimuthally averaged flux = np.array(flux / npix) else: # cumulative flux inside an aperture is the sum of all pixel values divided by the beam volume flux = np.cumsum(flux) / self.beamvol[channel] if calc_error: if profile: # error on the mean nbeam = npix / self.beamvol[channel] nbeam[nbeam < 1] = 1 # if there are less pixels than the beam size, better not count less than 1 err = np.array(self.rms[channel] / np.sqrt(nbeam)) else: # error estimate assuming Poissonian statistics: rms x sqrt(number of independent beams inside aperture) nbeam = npix / self.beamvol[channel] nbeam[nbeam < 1] = 1 err = np.array(self.rms[channel] * np.sqrt(nbeam)) else: err = np.full_like(flux, np.nan) # centers of bins radius = np.array(0.5 * (bins[1:] + bins[:-1])) # old loop version, human readable, but super slow in execution for large apertures and lots of pixels # for i in range(len(bins)-1): # #w=(distances>=bins[i]) & (distances<bins[i+1]) #annulus # w=(distances>=0) & (distances<bins[i+1]) # aperture (cumulative) # npix[i]=np.sum(w) # flux[i]=np.sum(im[w])/beamvol # err[i]=rms*np.sqrt(npix[i]/beamvol) # rms times sqrt of beams used for integration return radius, flux, err, npix
[docs] def aperture_r(self, ra: float = None, dec: float = None, freq: float = None, px: int = None, py: int = None, channel: int = 0, maxradius: float = 1.0, binspacing: float = None, bins: list = None, calc_error: bool = False): """ Obtain integrated flux within a circular aperture as a function of radius. If freq is undefined, will return the spectrum of the 3D cube. Alias function. Check the "growing_aperture" method for details. Units: ra[deg], dec[deg], maxradius[arcsec], binspacing[arcsec], freq[GHz] :return: (radius, flux) or (radius, flux, err) if calc_error """ radius, flux, err, npix = self.growing_aperture(ra=ra, dec=dec, freq=freq, maxradius=maxradius, binspacing=binspacing, bins=bins, px=px, py=py, channel=channel, profile=False, calc_error=calc_error) if calc_error: return radius, flux, err else: return radius, flux
[docs] def profile_r(self, ra: float = None, dec: float = None, freq: float = None, px: int = None, py: int = None, channel: int = 0, maxradius: float = 1.0, binspacing: float = None, bins: list = None, calc_error: bool = False): """ Obtain azimuthaly averaged profile as a function of radius. Alias function. Check the "growing_aperture" method for details. Units: ra[deg], dec[deg], maxradius[arcsec], binspacing[arcsec], freq[GHz] :return: (radius, profile) or (radius, profile err) if calc_error """ radius, profile, err, npix = self.growing_aperture(ra=ra, dec=dec, freq=freq, maxradius=maxradius, binspacing=binspacing, bins=bins, px=px, py=py, channel=channel, profile=True, calc_error=calc_error) if calc_error: return radius, profile, err else: return radius, profile return
[docs] def findclumps_1kernel(self, output_file, rms_region=1./4., minwidth=3, sextractor_param_file='', clean_tmp=True, negative=False, verbose=False): """ FINDCLUMP(s) algorithm (Decarli+2014,Walter+2016). Takes the cube image and outputs the 3d (x,y,wavelength) position of clumps of a minimum SN specified. Works by using a top-hat filter on a rebinned version of the datacube. :param output_file: relative/absolute path to the output catalogue :param rms_region: Region to compute the rms noise [2x2 array in image pixel coord]. If ``None``, takes the central 25% pixels (square) :param minwidth: Number of channels to bin :param sextractor_param_file: if '', gets the default.sex file in interferopy :param clean_tmp: Whether to remove or not the temporary files created by Sextractor :return: """ # keep tmpdir in the same folder as output file basepath = os.path.split(output_file)[0] if negative: tmpdir = os.path.join(basepath, 'tmp_findclumps_kw' + str(minwidth) + '_N/') else: tmpdir = os.path.join(basepath, 'tmp_findclumps_kw' + str(minwidth) + '_P/') if not os.path.exists(tmpdir): os.makedirs(tmpdir) assert not (os.path.isfile(output_file + '_kw' + str( int(minwidth)) + '.cat')), 'Output file "' + output_file + '_kw' + str( int(minwidth)) + '" already exists - please delete or change name!' if minwidth % 2 == 1: chnbox = int((minwidth - 1) / 2.0) else: self.log('WARNING: Window must be odd number of channels, using minwidth +1') chnbox = int(minwidth / 2.0) if negative: # sextractor expects the counter-intuitive freq,dec,ra order (because the header has not been changed) cube = -self.im.T else: # sextractor expects the counter-intuitive freq,dec,ra order (because the header has not been changed) cube = self.im.T nax1 = cube.shape[0] nax2 = cube.shape[1] nax3 = cube.shape[2] if verbose: self.log("Running sextractor for kernel_width={}".format(minwidth)) iterable = tqdm(range(chnbox, nax1 - chnbox - 1)) else: iterable = range(chnbox, nax1 - chnbox - 1) for k in iterable: # prep filenames if negative: name_mask_tmp = 'mask_kernel' + str(minwidth) + '_I' + str(k) + '_negative' else: name_mask_tmp = 'mask_kernel' + str(minwidth) + '_I' + str(k) + '_positive' tmp_fits = os.path.join(tmpdir, name_mask_tmp + '.fits') tmp_list = os.path.join(tmpdir, 'target_' + name_mask_tmp + '.list') # collapsing cube over chosen channel number, saving rms in center im_channel_sum = np.nansum(cube[k - chnbox:k + chnbox + 1, :, :], axis=0) # sum of only nans is 0.0 if np.alltrue(im_channel_sum == 0.0): if verbose: self.log("Slice {} is all nan or zero -- skipping.".format(k)) continue rms = np.nanstd( im_channel_sum[int(nax2 / 2) - int(nax2 * rms_region) - 1:int(nax2 / 2) + int(nax2 * rms_region), int(nax3 / 2) - int(nax3 * rms_region) - 1:int(nax3 / 2) + int(nax3 * rms_region)]) # create fitsfile hdu = fits.PrimaryHDU(data=im_channel_sum, header=self.head) hdul = fits.HDUList([hdu]) hdul.writeto(tmp_fits, overwrite=True) # run Sextractor path_sextractor_files = os.path.dirname(interferopy.__file__ ) + '/data/' if sextractor_param_file=='': sextractor_param_file = path_sextractor_files + 'default.sex' os.system('sex ' + tmp_fits + ' -c ' + sextractor_param_file + ' -PARAMETERS_NAME '+ path_sextractor_files + 'default.param' + ' -CATALOG_NAME ' + tmp_list + ' -VERBOSE_TYPE QUIET') # read output sextractor_cat = np.genfromtxt(tmp_list, skip_header=6) # process output if sextractor_cat.shape == (0,): if clean_tmp: os.remove(tmp_fits) os.remove(tmp_list) continue elif len(sextractor_cat.shape) == 1: sextractor_cat = sextractor_cat.reshape((-1, 6)) # append k , rms, freq to sextractor_cat sextractor_cat = np.hstack([sextractor_cat, np.ones((len(sextractor_cat), 1)) * k, np.ones((len(sextractor_cat), 1)) * rms, np.ones((len(sextractor_cat), 1)) * self.freqs[k]]) if k == chnbox: np.savetxt(fname=output_file + '_kw' + str(int(minwidth)) + '.cat', X=sextractor_cat, header="# 1 SNR_WIN Gaussian-weighted SNR \n \ # 2 FLUX_MAX Peak flux above background [count] \n \ # 3 X_IMAGE Object position along x [pixel] \n \ # 4 Y_IMAGE Object position along y [pixel] \n \ # 5 ALPHA_J2000 Right ascension of barycenter (J2000) [deg] \n \ # 6 DELTA_J2000 Declination of barycenter (J2000) [deg] \n \ # 7 k Central Channel [pixel] \n \ # 8 RMS RMS of collapsed channel map [Jy/beam] \n \ # 9 FREQ CENTRAL FREQUENCY [GHz] [Hz] ") else: with open(output_file + '_kw' + str(int(minwidth)) + '.cat', "ab") as f: np.savetxt(fname=f, X=sextractor_cat) # cleanup intermediate files from this loop if clean_tmp: os.remove(tmp_fits) os.remove(tmp_list) # finally remove tmpdir if clean_tmp: os.rmdir(tmpdir)
[docs] def findclumps_full(self, output_file, kernels=np.arange(3, 20, 2), rms_region=1./4., sextractor_param_file='', clean_tmp=True, ncores=1, run_search=True, run_positive=True, run_negative=True, run_crop=True, SNR_min=3, delta_offset_arcsec=2, delta_freq=0.1, run_fidelity=True, fidelity_bins=np.arange(0, 15, 0.2), min_SN_fit=4.0, fidelity_threshold=0.5, verbose=True): '''Run the full findclumps search and analysis for different kernel sizes, on the positive and/or negative cube(s): 1. Search (see `findclumps_1kernel()`) 2. Crop doubles and trim candidates above `min_SNR` (see `tools.run_line_stats_sex` and `tools_crop_doubles`) 3. Determine the fidelity and produce catalog of candidates (see `tools.fidelity_analysis`) To skip any of the steps, use the appropriate flags. This is intended to be used to re-run parts of the search and/or analysis after changing the parameters (e.g., adding different kernels and/or modifying the cropping criteria). :param output_file: relative/absolute path to the output catalogue :param kernels: list of odd kernel widths to use :param rms_region: Region to compute the rms noise [2x2 array in image pixel coord]. If ``None``, takes the central 25% pixels (square) :param sextractor_param_file: path to sextractor param file :param clean_tmp: cleanup intermediate sextractor files (disable for debugging) :param ncores: number of cores for multiprocessing (done over kernels and pos/neg search) :param run_search: if `False` will skip the search step :param run_positive: if `False` will skip the positive search :param run_negative: if `False` will skip the negative search :param run_crop: if `False` will skip the crop step :param SNR_min: Minimum SN of peaks to retain in the crop :param delta_offset_arcsec: maximum offset to match detections in the cube [arcsec] :param delta_freq: maximum frequency offset to match detections in the cube [GHz] :param run_fidelity: if `False` will skip the fidelity analysis :param fidelity_bins: SN bins for the fidelity analysis :param min_SN_fit: minimum SN for fidelity fit :param fidelity_threshold: Fidelity threshold above which to select candidates :param verbose: increase overall verbosity ''' if run_search: if ncores == 1: for i in kernels: if run_positive: self.findclumps_1kernel(output_file=output_file + '_clumpsP', negative=False, minwidth=i, clean_tmp=clean_tmp, rms_region=rms_region, sextractor_param_file=sextractor_param_file, verbose=verbose) if run_negative: self.findclumps_1kernel(output_file=output_file + '_clumpsN', negative=True, minwidth=i, clean_tmp=clean_tmp, rms_region=rms_region, sextractor_param_file=sextractor_param_file, verbose=verbose) else: from multiprocessing.dummy import Pool as ThreadPool from itertools import repeat kernels = np.atleast_1d(kernels) if run_positive and run_negative: kernels_width_neg_and_pos = np.concatenate((-kernels, kernels)) else: if run_positive: kernels_width_neg_and_pos = kernels elif run_negative: kernels_width_neg_and_pos = -kernels else: # this should not happen raise RuntimeError("ERROR: run_search=True but run_positve=run_negative=False; " "NotImplemented for Multiprocessing: set run_search=False") names = [output_file + '_clumpsN'] * len(kernels) + [output_file + '_clumpsP'] * len(kernels) # arguments: (output_file, rms_region, minwidth, sextractor_param_file, clean_tmp, negative) iterable = zip(names, repeat(rms_region), np.abs(kernels_width_neg_and_pos), repeat(sextractor_param_file), repeat(clean_tmp), (np.sign(kernels_width_neg_and_pos) < 0), repeat(verbose)) with ThreadPool(ncores) as p: p.starmap(self.findclumps_1kernel, iterable) p.close() p.join() if verbose: print('Findclumps done.') if run_crop: if verbose: print('Running line stats and cropping doubles...') # process positive catalog if run_positive: tools.run_line_stats_sex(sextractor_catalogue_name=output_file + '_clumpsP', binning_array=kernels, SNR_min=SNR_min, frame=self.radesys) tools.crop_doubles(cat_name=output_file + "_clumpsP_minSNR_" + str(SNR_min) + ".cat", delta_offset_arcsec=delta_offset_arcsec, delta_freq=delta_freq, verbose=verbose, frame=self.radesys) # process negative catalog if run_negative: tools.run_line_stats_sex(sextractor_catalogue_name=output_file + '_clumpsN', binning_array=kernels, SNR_min=SNR_min, frame=self.radesys) tools.crop_doubles(cat_name=output_file + "_clumpsN_minSNR_" + str(SNR_min) + ".cat", delta_offset_arcsec=delta_offset_arcsec, delta_freq=delta_freq, verbose=verbose, frame=self.radesys) # analyse fidelity if run_fidelity: catP, catN, candP, candN \ = tools.fidelity_analysis(catN_name=output_file + "_clumpsN_minSNR_" + str(SNR_min) + "_cropped.cat", catP_name=output_file + "_clumpsP_minSNR_" + str(SNR_min) + "_cropped.cat", bins=fidelity_bins, min_SN_fit=min_SN_fit, fidelity_threshold=fidelity_threshold, kernels=kernels, verbose=verbose) return catP, catN, candP, candN
[docs] def make_snr_cube(self, store: bool = False, filename: str = None, overwrite: bool=False): """Convert cube into a signal-to-noise cube (i.e., divide by rms), in-place. :param store: save resulting cube under `filename` (default: `False`) :param filename: `filename` (default:`None` appends `_snr` to `self.filename` :param overwrite: overwrite file if it exists """ self.im /= self.rms if store: if filename is None: f, e = os.path.splitext(self.filename) filename = f + '_snr' + e self.write_fitsfile(filename, overwrite=overwrite)
[docs]class MultiCube: """ A container like object to hold multiple cubes at the same time. Cubes are stored in a dictionary. Allows performing tasks such as residual scaled aperture integration or spectrum extraction. Load another cube into MultiCube object by e.g. mc.load_cube("path_to_cube.residual.fits", "residual") :Example: .. code-block:: python filename="cube.fits" ra, dec, freq = (205.533741, 9.477317341, 222.54) # coord for the emission line mc=MultiCube(filename) # will automatically try to load cube.xxx.fits, where xxx is residual, dirty, pb, model, psf, or image.pbcor # Alternatively load each cube manually # mc = MultiCube() # mc.load_cube("somewhere/cube.fits", "image") # mc.load_cube("elsewhere/cube.dirty.fits", "dirty") # mc.load_cube("elsewhere/cube.residual.fits", "residual") # mc.load_cube("elsewhere/cube.pb.fits", "pb") # spectrum extracted from the circular aperture, with associated error, corrected for residual and PB response flux, err, tab = mc.spectrum_corrected(ra=ra, dec=dec, radius=1.5, calc_error=True) # tab.write("spectrum.txt", format="ascii.fixed_width", overwrite=True) # Save results for later # curve of growth up to the maximum radius, in steps of one pixel, in a chosen frequency channel radius, flux, err, tab = mc.growing_aperture_corrected(ra=ra, dec=dec, freq=freq, maxradius=5, calc_error=True) # tab.write("growth.txt", format="ascii.fixed_width", overwrite=True) # Save results for later """ def __init__(self, filename: str = None, autoload_multi=True): """ Provide the file path to the final cleaned cube. Will try to find other adjacent cubes based on their names. Standard key names from CASA are: image, residual, model, psf, pb, image.pbcor Additional names are dirty, clean.comp :param filename: Path string to the cleaned cube fits image. :param autoload_multi: If true, attempt to find other cubes using preset (mostly CASA) suffixes. """ # these are standard suffixes from CASA tclean output (except "dirty") # gildas output has different naming conventions, which are not implemented here keylist = ["image", "residual", "dirty", "pb", "model", "psf", "image.pbcor", "clean.comp"] self.cubes = dict(zip(keylist, [None] * len(keylist))) self.basename = None if filename is None: # just setup the basic dictionary and exit return elif not os.path.exists(filename): raise FileNotFoundError(filename) filenames = dict(zip(keylist, [None] * len(keylist))) filenames["image"] = filename # TODO could improve searching, but there are infinite number of ways people could name these files # get the basename, which is hopefully shared between different cubes extension = ".fits" endings = [".image.tt0.fits", ".image.fits", ".fits"] for ending in endings: if filename.lower().endswith(ending): self.basename = filename[:-len(ending)] extension = ending.replace(".image", "") break # try to fill other filenames if autoload_multi: if self.basename is None: raise ValueError("Unknown extension.") # print("extension",extension) for k in keylist: # expected filename filename_suffixed = self.basename + "." + k + extension if os.path.exists(filename_suffixed): filenames[k] = filename_suffixed # load the cubes for k in self.cubes.keys(): if filenames[k] is not None: self.cubes[k] = Cube(filenames[k]) self.log("Loaded cubes: " + str(self.loaded_cubes))
[docs] def load_cube(self, filename: str, key: str = None): """ Add provided fits file to the MultiCube instance. Known keys are: "image", "residual", "dirty", "pb", "model", "psf", "image.pbcor" If the filename does not end with these words, please provide a manual key. :param filename: Path string to the fits image. :param key: Dictionary key name used to store the cube. :return: """ # try to estimate the key from the filename if key is None: # assuming the file ends in something like .residual.fits suffix = str(filename).split(".")[-2] # if this suffix matches preset ones if suffix in self.cubes.keys(): key = suffix # Load and store new cube cub = Cube(filename) if key is None: raise ValueError("Please provide key under which to store the cube.") if len(self.loaded_cubes) > 0: if cub.im.shape != self[self.loaded_cubes[0]].im.shape: self.log("Warning: Loaded cube has a different shape than existing one!") self[key] = cub
def __getitem__(self, key: str): return self.cubes[key] def __setitem__(self, key: str, value: Cube): self.cubes[key] = value
[docs] def get_loaded_cubes(self): """ Get a list of loaded cubes (those that are not None). :return: List of key names. """ return [k for k in self.cubes.keys() if self.cubes[k] is not None]
loaded_cubes = property(get_loaded_cubes) """List of keys corresponding to loaded cubes."""
[docs] def get_freqs(self): if len(self.loaded_cubes) > 0: return self[self.loaded_cubes[0]].freqs # maybe pick explicitely self["image"].freqs else: raise ValueError("No cubes present!")
freqs = property(get_freqs) """Array of frequencies corresponding to cube channels (from the first loaded cube)."""
[docs] def log(self, text: str): """ Basic logger function to allow better functionality in the future development. All class functions print info through this wrapper. Could be extended to provide different levels of info, timestamps, or logging to a file. """ print(text)
[docs] def make_clean_comp(self, overwrite=False): """ Generate a clean component cube. Defined as the cleaned cube minus the residual, or, alternatively, model image convolved with the clean beam. This cube is not outputted by CASA. :param overwrite: If true, overrides any present "clean.comp" cube. :return: None """ self.log("Generate clean component cube.") if self["image"] is None: raise ValueError("Cleaned cube is missing.") if self["residual"] is None: raise ValueError("Residual cube is missing.") if self["clean.comp"] is not None and not overwrite: self.log("Warning: clean.comp cube already exists and overwriting is disabled.") cub = Cube(self["image"].filename) cub.im = self["image"].im - self["residual"].im cub.filename = None # This cube is no longer the one on disk, so empty the filename as a precaution self["clean.comp"] = cub
# return self["clean.comp"]
[docs] def make_flatnoise(self, overwrite=False): """ Generate a flat noise cube from the primary beam (PB) corrected one, and the PB response (which is <= 1). PB corrected cube has valid fluxes, but rms computation is not straightforward. PB corrected cube = flat noise cube / PB response :param overwrite: If true, overrides any present "image"" cube. :return: None """ self.log("Generate flat noise image cube from the PB corrected one.") if self["image.pbcor"] is None: raise ValueError("PB corrected cleaned cube is missing.") if self["pb"] is None: raise ValueError("PB response cube is missing.") if self["image"] is not None and not overwrite: self.log("Warning: image cube already exists and overwriting is disabled.") cub = Cube(self["image.pbcor"].filename) cub.im = self["image.pbcor"].im * self["pb"].im cub.filename = None # This cube is no longer the one on disk, so empty the filename as a precaution self["image"] = cub
# return self["image"] def __cubes_prepare(self) -> bool: """ Perform several prelimiaries before attempting residual scaled flux extraction. :return: True if no problems are detected, False otherwise. """ # if necessary, use pb corrected map and pb to generate flat noise map only_pbcor_exists = self["image"] is None and self["image.pbcor"] is not None and self["image.pb"] is not None if only_pbcor_exists: self.make_flatnoise() cubes_exists = self["image"] is not None and self["dirty"] is not None and self["residual"] is not None if not cubes_exists: self.log("Error: Need all three cubes: image, dirty, residual!") return False # generate clean component cube (this is not a standard CASA output) # actually it is not necessary to have the full cube for residual scaling # if self["clean.comp"] is None: # self.make_clean_comp() shapes_equal = self["image"].im.shape == self["dirty"].im.shape == self["residual"].im.shape if not shapes_equal: self.log("Error: Cube shapes are not equal!") return False # Residual scaling assumes clean beam in all maps # Override the beam volume in dirty and residual maps to the cleaned one bvol = self["image"].beamvol self["dirty"].beamvol = bvol self["residual"].beamvol = bvol return True
[docs] def spectrum_corrected(self, ra: float = None, dec: float = None, freq: float = None, radius: float = 1.0, px: int = None, py: int = None, channel: int = None, calc_error=True, sn_cut: float = 2.5, apply_pb_corr=True): """ Extract aperture integrated spectrum from the cube using the residual scaling to account for the dirty beam. Correction for the primary beam response is applied if avaliable. Coordinates can be given in degrees (ra, dec) or pixels (px, py). If no coordinates are given, the center of the map is assumed. For details on the method see appendix A in Novak et al. (2019): https://ui.adsabs.harvard.edu/abs/2019ApJ...881...63N/abstract :param ra: Right ascention in degrees. :param dec: Declination in degrees. :param radius: Circular aperture radius in arcsec. :param px: Right ascention pixel coord (alternative to ra). :param py: Declination pixel coord (alternative to dec). :param channel: Channel index (alternative to freq) :param freq: Frequency in GHz. Extract only in a single channel instead of the full cube. :param calc_error: Set to False to skip error calculations, if the rms computation is slow or not necessary. :param sn_cut: Use emission above this S/N threshold to estimate the clean-to-dirty beam ratio, need calc_error. :param apply_pb_corr: Scale flux and error by the primary beam response (single pix value), needs loaded pb map. :return: flux, err, tab: 1D array for corrected flux and error estimate; tab is a Table with all computations. """ # take single pixel value if no aperture radius given if radius <= 0: self.log('No aperture defined - taking a single pixel') # raise ValueError("No aperture is defined!") # check and prepare cubes for residual scaling if not self.__cubes_prepare(): raise ValueError("Cubes check failed!") if freq is not None: channel = self["image"].freq2pix(freq) # run aperture extraction on all cubes # the beam volume should be the same in all of them (checked by __cubes_prepare) params = dict(ra=ra, dec=dec, radius=radius, px=px, py=py, channel=channel, freq=freq) flux_image, err, npix, peak_sb = self["image"].spectrum(calc_error=calc_error, **params) # compute rms only on this map flux_residual, _, _, _ = self["residual"].spectrum(calc_error=False, **params) flux_dirty, _, _, _ = self["dirty"].spectrum(calc_error=False, **params) flux_clean = flux_image - flux_residual # alternatively, force creation of the clean components cube # self.make_clean_comp() # flux_clean, _, _ = self["clean.comp"].spectrum(calc_error=False, **params) # this can be numerically unstable if there is no clean flux or if dirty == residual # nothing much to do about it, it's a limitation of the method epsilon = flux_clean / (flux_dirty - flux_residual) # estimate a single epsilon across the full spectrum # it is not expected to change a lot across channels (if PSF is consistent, and bandwidth is not too large) epsilon_fix = float(np.nanmedian(epsilon)) if calc_error: if channel is None: rmses = self["image"].rms else: rmses = np.array([self["image"].rms[channel]]) # use only high S/N channels w = (flux_clean / err) > sn_cut if np.sum(w) > 0: epsilon_fix = np.nanmedian(epsilon[w]) else: self.log("Warning: Nothing above S/N>" + str(sn_cut) + ", using all channels for epsilon.") else: rmses = np.full_like(flux_image, fill_value=np.nan) self.log("Using all channels for clean-to-dirty beam ratio (epsilon).") # corrected flux is estimated from the dirty map and the clean-to-dirty beam ratio (epsilon) flux = epsilon_fix * flux_dirty err = epsilon_fix * err peak_sb = peak_sb # apply PB correction if possible if apply_pb_corr and "pb" in self.loaded_cubes: self.log("Correcting flux and err for PB response.") pb, _, _, _ = self["pb"].spectrum(ra=ra, dec=dec, px=px, py=py, channel=channel, freq=freq, calc_error=False) flux = flux / pb err = err / pb peak_sb = peak_sb / pb elif apply_pb_corr and "pb" not in self.loaded_cubes: self.log("Warning: Cannot correct for PB, missing pb map.") pb = np.full(len(flux_image), np.nan) else: self.log("Flux is not PB corrected.") pb = np.full(len(flux_image), np.nan) # crude estimate if something went poorly, epsilon is too small or too big if epsilon_fix < 0.01 or epsilon_fix > 100: self.log("Warning: The clean-to-dirty beam ratio (epsilon) is badly determined.") epsilon_fix = np.full(len(flux), epsilon_fix) if channel is None: channels = np.arange(len(flux)) else: channels = [channel] freqs = self.freqs[channels] nbeam = npix / self["image"].beamvol tab = Table([channels, # Channel index freqs, # Frequency of the channel in GHz flux, # Aperture flux corrected for residual and primary beam response err, # Error estimate on the corrected flux epsilon_fix, # Best estimate of the clean-to-dirty beam ratio, fixed across all channels flux_image, # Aperture flux measured in the final map (assumes clean beam) flux_dirty, # Aperture flux measured in the dirty map (assumes clean beam) flux_residual, # Aperture flux measured in the residual map (assumes clean beam) flux_clean, # Aperture flux measured in the clean components map (= final - residual) npix, # Number of pixels inside the aperture nbeam, # Number of beams inside the aperture epsilon, # Clean-to-dirty beam ratio in the channel rmses, # Rms noise of the channel peak_sb, # (Corrected) Peak Surface Brightness of the channel pb], # Primary beam response (<=1) at the given coordinate, single pixel value names=["channel", "freq", "flux", "err", "epsilon_fix", "flux_image", "flux_dirty", "flux_residual", "flux_clean", "npix", "nbeam", "epsilon", "rms", "peak_sb", "pb"]) return np.array(flux), np.array(err), tab
[docs] def growing_aperture_corrected(self, ra: float = None, dec: float = None, freq: float = None, maxradius=1.0, binspacing: float = None, bins: list = None, px: int = None, py: int = None, channel: int = 0, calc_error=True, apply_pb_corr=True, plot=False): """ Extract the curve of growth from the map using the residual scaling to account for the dirty beam. Correction for the primary beam response is applied if avaliable. Coordinates can be given in degrees (ra, dec) or pixels (px, py). If no coordinates are given, the center of the map is assumed. For details on the method see appendix A in Novak et al. (2019): https://ui.adsabs.harvard.edu/abs/2019ApJ...881...63N/abstract :param ra: Right ascention in degrees. :param dec: Declination in degrees. :param freq: Frequency in GHz. :param maxradius: Max radius for aperture integration in arcsec. :param binspacing: Resolution of the growth flux curve in arcsec, default is one pixel size. :param bins: Custom bins for curve growth (1D np array). :param px: Right ascention pixel coord (alternative to ra). :param py: Declination pixel coord (alternative to dec). :param channel: Index of the cube channel to take (alternative to freq). Default is the first channel. :param calc_error: Set to False to skip error calculations, if the rms computation is slow or not necessary. :param apply_pb_corr: Scale flux and error by the primary beam response (single pix value), needs loaded pb map. :return: radius, flux, err, tab: 1D array for radius[arcsec] and corrected flux and error estimate; tab is a Table with all computations. """ # check and prepare cubes for residual scaling if not self.__cubes_prepare(): raise ValueError("Cubes check failed!") # define channel to use if freq is not None: channel = self["image"].freq2pix(freq) # run growing aperture extraction on all cubes in a single channel # the beam volume should be the same in all of them (checked by __cubes_prepare) params = dict(ra=ra, dec=dec, maxradius=maxradius, px=px, py=py, channel=channel, freq=freq, binspacing=binspacing, bins=bins, profile=False) # shared parameters radius, flux_image, err, npix = self["image"].growing_aperture(calc_error=calc_error, **params) # rms here only _, flux_residual, _, _ = self["residual"].growing_aperture(calc_error=False, **params) _, flux_dirty, _, _ = self["dirty"].growing_aperture(calc_error=False, **params) flux_clean = flux_image - flux_residual epsilon = flux_clean / (flux_dirty - flux_residual) flux = np.array(epsilon * flux_dirty) err = epsilon * err nbeam = npix / self["image"].beamvol[channel] # apply PB correction if possible if apply_pb_corr and "pb" in self.loaded_cubes: self.log("Correcting flux and err for PB response.") # will return a single pb value, because channel is set pb, _, _, _ = self["pb"].spectrum(ra=ra, dec=dec, px=px, py=py, channel=channel, freq=freq, calc_error=False) pb = np.full(len(flux_image), pb) flux = np.array(flux / pb) err = np.array(err / pb) elif apply_pb_corr and "pb" not in self.loaded_cubes: self.log("Warning: Cannot correct for PB, missing pb map.") pb = np.full(len(flux_image), np.nan) else: self.log("Flux is not PB corrected.") pb = np.full(len(flux_image), np.nan) tab = Table([radius, # Aperture radius in arcsec. flux, # Aperture flux corrected for residual err, # Error estimate on the corrected flux flux_image, # Aperture flux measured in the final map (assumes clean beam) flux_dirty, # Aperture flux measured in the dirty map (assumes clean beam) flux_residual, # Aperture flux measured in the residual map (assumes clean beam) flux_clean, # Aperture flux measured in the clean components map (= final - residual) epsilon, # Clean-to-dirty beam ratio for a given aperture size npix, # Number of pixels inside the aperture nbeam, # Number of beams inside the aperture: nbeam = npix / beamvol pb], # Primary beam response (<=1) applied to flux and err columns, single value at all radii names=["radius", "flux", "err", "flux_image", "flux_dirty", "flux_residual", "flux_clean", "epsilon", "npix", "nbeam", "pb"]) if plot: tools.plot_growing_aperture_corrected(self, tab) return radius, flux, err, tab