Source code for jwst.cube_build.ifu_cube

"""Work horse routines used for building ifu spectra cubes."""

import logging
import math
import warnings

import numpy as np
from astropy import units as u
from astropy.coordinates import SkyCoord
from astropy.stats import circmean
from gwcs import wcstools
from gwcs.utils import to_index
from stdatamodels.jwst import datamodels
from stdatamodels.jwst.datamodels import dqflags

from jwst.assign_wcs import nirspec, pointing
from jwst.assign_wcs.util import wrap_ra
from jwst.cube_build import coord, cube_build_wcs_util, cube_internal_cal
from jwst.cube_build.cube_match_sky_driz import cube_wrapper_driz  # c extension
from jwst.cube_build.cube_match_sky_pointcloud import cube_wrapper  # c extension
from jwst.datamodels import ModelContainer
from jwst.model_blender import blendmeta

log = logging.getLogger(__name__)

__all__ = ["IFUCubeData", "IncorrectInputError", "IncorrectParameterError"]


[docs] class IFUCubeData: """ Combine IFU data onto a regular grid. Parameters ---------- pipeline : int Integer that indicates which pipeline is being run: * 2 = :ref:`calwebb_spec2 <calwebb_spec2>` * 3 = :ref:`calwebb_spec3 <calwebb_spec3>` input_models : `~jwst.datamodels.container.ModelContainer` Container of multiple `~stdatamodels.jwst.datamodels.IFUImageModel` output_name_base : str String defining the name of the output product. Unlike other steps, ``cube_build`` determines the name of the output file because the bands of the data are determined after reading in the data. In addition, the name of the output also depend if :ref:`calwebb_spec2 <calwebb_spec2>` or :ref:`calwebb_spec3 <calwebb_spec3>` is being run and if the user has selected particular band to use. output_type : str The type of cube to created. The possibilities are: * ``'band'`` * ``'channel'`` * ``'grating'`` * ``'multi'`` The default for :ref:`calwebb_spec2 <calwebb_spec2>` is to create ``'multi'`` band cubes. In the case of MIRI, the ``'multi'`` band cubes from :ref:`calwebb_spec2 <calwebb_spec2>` contain the two channels from single calibration FITS file. The default type of cube for :ref:`calwebb_spec3 <calwebb_spec3>` is ``'band'``. These cubes will contain a single band of data. linear_wave : bool If true then create a linear wavelength dimension for output cubes. If false then output cubes have a non-linear wavelength spacing. The non-linear spacing is determined by the cube_pars reference file. instrument : str Instrument name, either "MIRI" or "NIRSpec" list_par1 : list For MIRI the list contains the channels used to build the IFU, while for NIRSpec the list contains the grating used. list_par2 : list For MIRI the list contains the sub-channels uses to build the IFU, while for NIRSpec the list contains the filters used. instrument_info : dict Dictionary containing information on the basic instrument parameters. master_table : dict Dictionary of containing the files covering each band. **pars_cube : dict Dictionary of parameters controlling how the cube is built. """ def __init__( self, pipeline, input_models, output_name_base, output_type, linear_wave, instrument, list_par1, list_par2, instrument_info, master_table, **pars_cube, ): self.input_models_this_cube = [] # list of files use to make cube working on self.pipeline = pipeline self.input_models = input_models # needed when building single mode IFU cubes self.output_name_base = output_name_base self.num_files = None self.instrument = instrument self.list_par1 = list_par1 self.list_par2 = list_par2 self.instrument_info = instrument_info # dictionary class imported in cube_build.py self.master_table = master_table self.output_type = output_type self.linear_wave = linear_wave self.scalexy = pars_cube.get("scalexy") self.scalew = pars_cube.get("scalew") self.ra_center = pars_cube.get("ra_center") self.dec_center = pars_cube.get("dec_center") self.cube_pa = pars_cube.get("cube_pa") self.nspax_x = pars_cube.get("nspax_x") self.nspax_y = pars_cube.get("nspax_y") self.offsets = pars_cube.get("offsets") self.rois = pars_cube.get("rois") self.roiw = pars_cube.get("roiw") self.debug_spaxel = pars_cube.get("debug_spaxel") self.spaxel_x, self.spaxel_y, self.spaxel_z = [ int(val) for val in self.debug_spaxel.split() ] self.spatial_size = None self.spectral_size = None self.interpolation = pars_cube.get("interpolation") self.coord_system = pars_cube.get("coord_system") self.wavemin = pars_cube.get("wavemin") self.wavemax = pars_cube.get("wavemax") self.weighting = pars_cube.get("weighting") self.weight_power = pars_cube.get("weight_power") self.skip_dqflagging = pars_cube.get("skip_dqflagging") self.suffix = pars_cube.get("suffix") self.num_bands = 0 self.output_name = "" self.wavemin_user = False # Check for NIRSpec if user has set wavelength limits self.wavemax_user = False self.soft_rad = None self.scalerad = None self.linear_wavelength = True self.roiw_table = None self.rois_table = None self.softrad_table = None self.scalerad_table = None self.weight_power_table = None self.wavelength_table = None self.cdelt1 = None self.cdelt2 = None self.cdelt3 = None self.crpix1 = None self.crpix2 = None self.crpix3 = None self.crval1 = None self.crval2 = None self.crval3 = None self.naxis1 = None self.naxis2 = None self.naxis3 = None self.cdelt3_normal = None self.rot_angle = None # rotation angle between Ra-Dec and IFU local instrument plane self.a_min = 0 self.a_max = 0 self.b_min = 0 self.b_max = 0 self.lambda_min = 0 self.lambda_max = 0 self.xcoord = None self.ycoord = None self.zcoord = None self.tolerance_dq_overlap = 0.05 # spaxel has to have 5% overlap to flag in FOV self.overlap_partial = 4 # intermediate flag self.overlap_full = 2 # intermediate flag self.overlap_hole = dqflags.pixel["DO_NOT_USE"] self.overlap_no_coverage = dqflags.pixel["NON_SCIENCE"] # ________________________________________________________________________________
[docs] def check_ifucube(self): """ Perform some quick checks that the type of cube to be produced conforms to the rules. Raises ------ IncorrectInputError Interpolation = area was selected for when input data is more than one file or model """ num1 = len(self.list_par1) num_files = 0 for i in range(num1): this_a = self.list_par1[i] this_b = self.list_par2[i] n = len(self.master_table.FileMap[self.instrument][this_a][this_b]) num_files = num_files + n self.num_files = num_files # do some basic checks on the cubes if self.coord_system == "internal_cal": if num_files > 1: raise IncorrectInputError( "Cubes built in internal_cal coordinate system" " are built from a single file, not multiple exposures" ) if len(self.list_par1) > 1: raise IncorrectInputError( "Only a single channel or grating " " can be used to create cubes in internal_cal coordinate system." " Use --output_type=band" )
# ________________________________________________________________________________
[docs] def define_cubename(self): """ Define the base output name. Usually the output name is defined by the association table. However in the case of ``cube_build``, several cubes can be created from a single call. The user can override the type of data to combine to make a cube. It is left to ``cube_build`` to determine which channels, bands, gratings, or filters are used to make the IFU cube. The final name includes the channel/band (MIRI) or grating/filter (NIRSpec). Returns ------- newname : str Output name of the IFU cube. """ if self.instrument == "MIRI": # Check to see if the output base name already contains the # field "clear", which sometimes shows up in IFU product # names created by the ASN rules. If so, strip it off, so # that the remaining suffixes created below form the entire # list of optical elements in the final output name. basename = self.output_name_base suffix = basename[basename.rfind("_") + 1 :] if suffix in ["clear"]: self.output_name_base = basename[: basename.rfind("_")] # Now compose the appropriate list of optical element suffix names # based on MRS channel and sub-channel channels = [] for ch in self.list_par1: if ch not in channels: channels.append(ch) number_channels = len(channels) ch_name = "_ch" for i in range(number_channels): ch_name = ch_name + channels[i] if i < number_channels - 1: ch_name = ch_name + "-" # Sort by inverse alphabetical, e.g. short -> medium -> long subchannels = sorted(set(self.list_par2))[::-1] log.info(f"Subchannel listing: {subchannels}") number_subchannels = len(subchannels) b_name = "" for i in range(number_subchannels): b_name = b_name + subchannels[i] b_name = b_name.lower() newname = self.output_name_base + ch_name + "-" + b_name if self.coord_system == "internal_cal": newname = self.output_name_base + ch_name + "-" + b_name + "_internal" elif self.output_type == "single": newname = self.output_name_base + ch_name + "-" + b_name + "_single" elif self.pipeline == 2: newname = self.output_name_base + "_" + self.suffix + ".fits" elif self.instrument == "NIRSPEC": # Check to see if the output base name already has a grating/prism # suffix attached. If so, strip it off, and let the following logic # add all necessary grating and filter suffixes. basename = self.output_name_base suffix = basename[basename.rfind("_") + 1 :] if suffix in ["g140m", "g235m", "g395m", "g140h", "g235h", "g395h", "prism"]: self.output_name_base = basename[: basename.rfind("_")] fg_name = "_" for i in range(len(self.list_par1)): fg_name = fg_name + self.list_par1[i] + "-" + self.list_par2[i] if i < self.num_bands - 1: fg_name = fg_name + "-" fg_name = fg_name.lower() newname = self.output_name_base + fg_name if self.output_type == "single": newname = self.output_name_base + fg_name + "_single" elif self.coord_system == "internal_cal": newname = self.output_name_base + fg_name + "_internal" elif self.pipeline == 2: newname = self.output_name_base + "_" + self.suffix + ".fits" if self.output_type != "single": log.info(f"Output Name: {newname}") return newname
# _______________________________________________________________________
[docs] def set_geometry(self, corner_a, corner_b, lambda_min, lambda_max): """ Set up the WCS of the cube in the tangent plane. Parameters ---------- corner_a : ndarray Array of RA corners of the footprint of all input data. corner_b : ndarray Array of Dec corners of the footprint of all input data. lambda_min : float Minimum wavelength value of the data. lambda_max : float Maximum wavelength value of the data. """ ra_min = np.min(corner_a) ra_max = np.max(corner_a) dec_min = np.min(corner_b) dec_max = np.max(corner_b) dec_ave = (dec_min + dec_max) / 2.0 # we can not average ra values because of the convergence # of hour angles. ravalues = np.zeros(2) ravalues[0] = ra_min ravalues[1] = ra_max # astropy circmean assumes angles are in radians, # we have angles in degrees ra_ave = circmean(ravalues * u.deg).value if self.ra_center is not None: self.crval1 = self.ra_center else: self.crval1 = ra_ave if self.dec_center is not None: self.crval2 = self.dec_center else: self.crval2 = dec_ave rot_angle = self.rot_angle # find the 4 corners xi_corner = [] eta_corner = [] num = len(corner_a) for i in range(num): xi, eta = coord.radec2std(self.crval1, self.crval2, corner_a[i], corner_b[i], rot_angle) xi_corner.append(xi) eta_corner.append(eta) xi_min = min(xi_corner) xi_max = max(xi_corner) eta_min = min(eta_corner) eta_max = max(eta_corner) # find the CRPIX1 CRPIX2 - xi and eta centered at 0,0 # to find location of center abs of min values is how many pixels # we want a symmetric cube centered on xi,eta = 0 xilimit = max(np.abs(xi_min), np.abs(xi_max)) etalimit = max(np.abs(eta_min), np.abs(eta_max)) na = math.ceil((xilimit / self.cdelt1).item()) + 1 nb = math.ceil((etalimit / self.cdelt2).item()) + 1 # if the user set the nspax_x or nspax_y then redefine na, nb # it is assumed that both values are ODD numbers # We want the central pixel to be the tangent point with na/nb pixels on either # side of central pixel. if self.nspax_x is not None: na = int(self.nspax_x / 2) if self.nspax_y is not None: nb = int(self.nspax_y / 2) xi_min = 0.0 - (na * self.cdelt1) - (self.cdelt1 / 2.0) xi_max = (na * self.cdelt1) + (self.cdelt1 / 2.0) eta_min = 0.0 - (nb * self.cdelt2) - (self.cdelt2 / 2.0) eta_max = (nb * self.cdelt2) + (self.cdelt2 / 2.0) self.crpix1 = float(na) + 1.0 self.crpix2 = float(nb) + 1.0 self.naxis1 = na * 2 + 1 self.naxis2 = nb * 2 + 1 self.a_min = xi_min self.a_max = xi_max self.b_min = eta_min self.b_max = eta_max # center of spaxels self.xcoord = np.zeros(self.naxis1) xstart = xi_min + self.cdelt1 / 2.0 self.xcoord = np.arange( start=xstart, stop=xstart + self.naxis1 * self.cdelt1, step=self.cdelt1 ) self.ycoord = np.zeros(self.naxis2) ystart = eta_min + self.cdelt2 / 2.0 self.ycoord = np.arange( start=ystart, stop=ystart + self.naxis2 * self.cdelt2, step=self.cdelt2 ) # depending on the naxis and cdelt values the x,ycoord can have 1 more element than naxis. # Clean up arrays dropping extra values at the end. self.xcoord = self.xcoord[0 : self.naxis1] self.ycoord = self.ycoord[0 : self.naxis2] xv, yv = np.meshgrid(self.xcoord, self.ycoord) self.xcenters = xv.flatten() self.ycenters = yv.flatten() # set up the lambda (z) coordinate of the cube self.cdelt3_normal = None if self.linear_wavelength: self.lambda_min = lambda_min self.lambda_max = lambda_max range_lambda = self.lambda_max - self.lambda_min self.naxis3 = int(math.ceil(range_lambda / self.cdelt3)) # adjust max based on integer value of naxis3 self.lambda_max = self.lambda_min + (self.naxis3) * self.cdelt3 self.zcoord = np.zeros(self.naxis3) # CRPIX3 for FITS is 1 (center of first pixel) # CRVAL3 then is lambda_min + self.cdelt3/ 2.0, which is also zcoord[0] # Note that these are all values at the center of a spaxel self.crval3 = self.lambda_min + self.cdelt3 / 2.0 self.crpix3 = 1.0 zstart = self.lambda_min + self.cdelt3 / 2.0 self.zcoord = np.arange(start=zstart, stop=self.lambda_max, step=self.cdelt3) self.zcoord = self.zcoord[0 : self.naxis3] else: self.naxis3 = len(self.wavelength_table) self.zcoord = np.asarray(self.wavelength_table) self.crval3 = self.wavelength_table[0] self.crpix3 = 1.0 # set up the cdelt3_normal normalizing array used cdelt3_normal = np.zeros(self.naxis3) for j in range(self.naxis3 - 1): cdelt3_normal[j] = self.zcoord[j + 1] - self.zcoord[j] cdelt3_normal[self.naxis3 - 1] = cdelt3_normal[self.naxis3 - 2] self.cdelt3_normal = cdelt3_normal
# _______________________________________________________________________
[docs] def set_geometry_slicer(self, corner_a, corner_b, lambda_min, lambda_max): """ Set up the size of the cube in the internal IFU plane. This will be a single exposure cube. The internal IFU Cube is in the slicer plane and is defined by along slice and across slice coordinates. Parameters ---------- corner_a : ndarray Array of along slice corners of the footprint of all input data. corner_b : ndarray Array of across slice corners of the footprint of all input data. lambda_min : float Minimum wavelength value of the data. lambda_max : float Maximum wavelength value of the data. """ self.a_min = np.min(corner_a) self.a_max = np.max(corner_a) self.b_min = np.min(corner_b) self.b_max = np.max(corner_b) self.lambda_min = lambda_min self.lambda_max = lambda_max # along slice: a # across slice: b alimit = max(np.abs(self.a_min), np.abs(self.a_max)) range_b = self.b_max - self.b_min if self.instrument == "MIRI": along_cdelt = self.cdelt1 n1a = math.ceil(alimit / along_cdelt) n1b = math.ceil(alimit / along_cdelt) self.a_min = 0.0 - (n1a * along_cdelt) - (along_cdelt / 2.0) self.a_max = (n1b * along_cdelt) + (along_cdelt / 2.0) self.naxis1 = n1a + n1b + 1 along_naxis = self.naxis1 self.naxis2 = int(math.ceil(range_b / self.cdelt2)) across_naxis = self.naxis2 across_cdelt = self.cdelt2 if self.instrument == "NIRSPEC": along_cdelt = self.cdelt2 n1a = math.ceil(alimit / along_cdelt) n1b = math.ceil(alimit / along_cdelt) self.a_min = 0.0 - (n1a * along_cdelt) - (along_cdelt / 2.0) self.a_max = (n1b * along_cdelt) + (along_cdelt / 2.0) self.naxis2 = n1a + n1b + 1 along_naxis = self.naxis2 self.naxis1 = int(math.ceil(range_b / self.cdelt1)) across_naxis = self.naxis1 across_cdelt = self.cdelt1 acoord = np.zeros(along_naxis) astart = self.a_min + (along_cdelt / 2.0) for i in range(along_naxis): acoord[i] = astart astart = astart + along_cdelt # set up the across slice parameters b_center = (self.b_max + self.b_min) / 2.0 # adjust min and max based on integer value of naxis2 self.b_max = b_center + (across_naxis / 2.0) * across_cdelt self.b_min = b_center - (across_naxis / 2.0) * across_cdelt across_coord = np.zeros(across_naxis) start = self.b_min + across_cdelt / 2.0 for i in range(across_naxis): across_coord[i] = start start = start + across_cdelt if self.instrument == "MIRI": self.crval1 = self.a_min self.xcoord = acoord self.ycoord = across_coord self.crval2 = self.b_min if self.instrument == "NIRSPEC": self.crval2 = self.a_min self.ycoord = acoord self.xcoord = across_coord self.crval1 = self.b_min # _______________________________________________________________________ # common to both MIRI and NIRSPEC self.crpix1 = 0.5 self.crpix2 = 0.5 # set up the lambda (z) coordinate of the cube range_lambda = self.lambda_max - self.lambda_min self.naxis3 = int(math.ceil(range_lambda / self.cdelt3)) # adjust max based on integer value of naxis3 lambda_center = (self.lambda_max + self.lambda_min) / 2.0 self.lambda_min = lambda_center - (self.naxis3 / 2.0) * self.cdelt3 self.lambda_max = lambda_center + (self.naxis3 / 2.0) * self.cdelt3 self.lambda_max = self.lambda_min + (self.naxis3) * self.cdelt3 self.zcoord = np.zeros(self.naxis3) zstart = self.lambda_min + self.cdelt3 / 2.0 self.crval3 = zstart self.crpix3 = 1.0 for i in range(self.naxis3): self.zcoord[i] = zstart zstart = zstart + self.cdelt3
# _______________________________________________________________________
[docs] def print_cube_geometry(self): """Print to log the general properties of the size of the IFU cube.""" log.info("Cube Geometry:") if self.coord_system == "internal_cal": log.info( "axis# Naxis CRPIX CRVAL CDELT(arcsec) " " Min & Max (along slice, across slice)" ) else: log.info("axis# Naxis CRPIX CRVAL CDELT(arcsec) Min & Max (xi, eta arcsec)") log.info( "Axis 1 %5d %5.2f %12.8f %12.8f %12.8f %12.8f", self.naxis1, self.crpix1, self.crval1, self.cdelt1, self.a_min, self.a_max, ) log.info( "Axis 2 %5d %5.2f %12.8f %12.8f %12.8f %12.8f", self.naxis2, self.crpix2, self.crval2, self.cdelt2, self.b_min, self.b_max, ) if self.linear_wavelength: log.info("axis# Naxis CRPIX CRVAL CDELT(microns) Min & Max (microns)") log.info( "Axis 3 %5d %5.2f %12.8f %12.8f %12.8f %12.8f", self.naxis3, self.crpix3, self.crval3, self.cdelt3, self.lambda_min, self.lambda_max, ) if not self.linear_wavelength: log.info("Non-linear wavelength dimension; CDELT3 variable") log.info("axis# Naxis CRPIX CRVAL Min & Max (microns)") log.info( "Axis 3 %5d %5.2f %12.8f %12.8f %12.8f", self.naxis3, self.crpix3, self.crval3, self.wavelength_table[0], self.wavelength_table[self.naxis3 - 1], ) if self.rot_angle is not None: log.info("Rotation angle between RA-Dec and Slicer-Plane %12.8f", self.rot_angle) if self.instrument == "MIRI": # length of channel and subchannel are the same number_bands = len(self.list_par1) for i in range(number_bands): this_channel = self.list_par1[i] this_subchannel = self.list_par2[i] log.info(f"Cube covers channel, subchannel: {this_channel}, {this_subchannel}") elif self.instrument == "NIRSPEC": # number of filters and gratings are the same number_bands = len(self.list_par1) for i in range(number_bands): this_fwa = self.list_par2[i] this_gwa = self.list_par1[i] log.info(f"Cube covers grating, filter: {this_gwa}, {this_fwa}")
# ________________________________________________________________________________
[docs] def build_ifucube(self): """ Create an IFU cube. 1. Loop over every band contained in the IFU cube and read in the data associated with the band. 2. :meth:`map_detector_to_outputframe`: Maps the detector data to the cube output coordinate system. 3. For each mapped detector pixel, find the IFU cube spaxel located in the region of interest. There are two different routines to do this step, both of which use a C extension to combine the detector fluxes that fall within a region of influence from the spaxel center: a. ``src/cube_match_sky*.c``: This routine uses the modified Shepard method to determine the weighting function, which weights the detector fluxes based on the distance between the detector center and spaxel center. b. ``src/cube_match_internal.c`` is only for single exposure, single band cubes, and the IFU cube in created in the detector plane. The weighting function is based on the overlap of between the detector pixel and spaxel. This method is simplified to determine the overlap in the along slice-wavelength plane. 4. :meth:`find_spaxel_flux`: find the final flux associated with each spaxel 5. :meth:`setup_final_ifucube_model` Returns ------- result : `~stdatamodels.jwst.datamodels.IFUCubeModel` An IFU cube of combined IFU image data. """ self.output_name = self.define_cubename() total_num = self.naxis1 * self.naxis2 * self.naxis3 self.spaxel_flux = np.zeros(total_num, dtype=np.float64) self.spaxel_weight = np.zeros(total_num, dtype=np.float64) self.spaxel_var = np.zeros(total_num, dtype=np.float64) self.spaxel_iflux = np.zeros(total_num, dtype=np.float64) self.spaxel_dq = np.zeros(total_num, dtype=np.uint32) nxyplane = self.naxis1 * self.naxis2 if self.spaxel_z == -1 and self.spaxel_x == -1 and self.spaxel_y == -1: debug_cube_index = -1 elif self.spaxel_z < 0 or self.spaxel_x < 0 or self.spaxel_y < 0: log.info("Incorrect input for Debug Spaxel values. Counting starts at 0") debug_cube_index = -1 log.info(f"{self.spaxel_z} {self.spaxel_x} {self.spaxel_y}") else: spaxel_z = self.spaxel_z spaxel_x = self.spaxel_x spaxel_y = self.spaxel_y debug_cube_index = spaxel_z * (nxyplane) + spaxel_y * self.naxis1 + spaxel_x log.info( f"Printing debug information for cube spaxel: {spaxel_x} {spaxel_y} {spaxel_z}" ) # loop over every file that covers this channel/subchannel (MIRI) or # Grating/filter(NIRSPEC) # and map the detector pixels to the cube spaxel number_bands = len(self.list_par1) k = 0 for ib in range(number_bands): this_par1 = self.list_par1[ib] this_par2 = self.list_par2[ib] for input_model in self.master_table.FileMap[self.instrument][this_par1][this_par2]: # loop over the files that cover the spectral range the cube is for self.input_models_this_cube.append(input_model) # set up input_model to be first file used to copy in basic header info # to ifucube meta data if ib == 0 and k == 0: input_model_ref = input_model log.debug(f"Working on Band defined by: {this_par1} {this_par2}") input_frame = input_model.meta.wcs.available_frames[0] if self.interpolation in ["pointcloud", "drizzle"]: pixelresult = self.map_detector_to_outputframe(this_par1, input_model) ( coord1, coord2, corner_coord, wave, dwave, flux, err, slice_no, rois_pixel, roiw_pixel, weight_pixel, softrad_pixel, scalerad_pixel, x_det, y_det, ) = pixelresult # by default flag the dq plane based on the FOV of the detector projected to sky flag_dq_plane = 1 if self.skip_dqflagging: flag_dq_plane = 0 # check that there is valid data returned # If all the data is flagged as DO_NOT_USE - not common- then log warning build_cube = True if wave is None: log.warning(f"No valid data found on file {input_model.meta.filename}") flag_dq_plane = 0 build_cube = False # C extension setup start_region = 0 end_region = 0 if self.instrument == "MIRI": instrument = 0 start_region = self.instrument_info.get_start_slice(this_par1) end_region = self.instrument_info.get_end_slice(this_par1) else: # NIRSPEC instrument = 1 result = None weight_type = 0 # default to emsm instead of msm if self.weighting == "msm": weight_type = 1 if self.interpolation == "pointcloud" and build_cube: roiw_ave = np.mean(roiw_pixel) result = cube_wrapper( instrument, flag_dq_plane, weight_type, start_region, end_region, self.overlap_partial, self.overlap_full, self.xcoord, self.ycoord, self.zcoord, coord1, coord2, wave, flux, err, slice_no, rois_pixel, roiw_pixel, scalerad_pixel, weight_pixel, softrad_pixel, self.cdelt3_normal, roiw_ave, self.cdelt1, self.cdelt2, ) spaxel_flux, spaxel_weight, spaxel_var, spaxel_iflux, spaxel_dq = result self.spaxel_flux = self.spaxel_flux + np.asarray(spaxel_flux, np.float64) self.spaxel_weight = self.spaxel_weight + np.asarray( spaxel_weight, np.float64 ) self.spaxel_var = self.spaxel_var + np.asarray(spaxel_var, np.float64) self.spaxel_iflux = self.spaxel_iflux + np.asarray(spaxel_iflux, np.float64) spaxel_dq.astype(np.uint) self.spaxel_dq = np.bitwise_or(self.spaxel_dq, spaxel_dq) result = None del result del spaxel_flux, spaxel_weight, spaxel_var, spaxel_iflux, spaxel_dq if self.weighting == "drizzle" and build_cube: cdelt3_mean = np.nanmean(self.cdelt3_normal) xi1, eta1, xi2, eta2, xi3, eta3, xi4, eta4 = corner_coord linear = 0 if self.linear_wavelength: linear = 1 if debug_cube_index >= 0: log.info(f"Input filename: {input_model.meta.filename}") result = cube_wrapper_driz( instrument, flag_dq_plane, start_region, end_region, self.overlap_partial, self.overlap_full, self.xcoord, self.ycoord, self.zcoord, coord1, coord2, wave, flux, err, slice_no, xi1, eta1, xi2, eta2, xi3, eta3, xi4, eta4, dwave, self.cdelt3_normal, self.cdelt1, self.cdelt2, cdelt3_mean, linear, x_det, y_det, debug_cube_index, ) spaxel_flux, spaxel_weight, spaxel_var, spaxel_iflux, spaxel_dq = result self.spaxel_flux = self.spaxel_flux + np.asarray(spaxel_flux, np.float64) self.spaxel_weight = self.spaxel_weight + np.asarray( spaxel_weight, np.float64 ) self.spaxel_var = self.spaxel_var + np.asarray(spaxel_var, np.float64) self.spaxel_iflux = self.spaxel_iflux + np.asarray(spaxel_iflux, np.float64) spaxel_dq.astype(np.uint) self.spaxel_dq = np.bitwise_or(self.spaxel_dq, spaxel_dq) result = None del result del spaxel_flux, spaxel_weight, spaxel_var, spaxel_iflux, spaxel_dq # AREA - 2d method only works for single files local slicer plane (internal_cal) elif self.interpolation == "area": # ---------------------------------------------------------------------------- # MIRI # ---------------------------------------------------------------------------- if self.instrument == "MIRI": det2ab_transform = input_model.meta.wcs.get_transform( input_frame, "alpha_beta" ) start_region = self.instrument_info.get_start_slice(this_par1) end_region = self.instrument_info.get_end_slice(this_par1) regions = list(range(start_region, end_region + 1)) for i in regions: log.info("Working on Slice # %d", i) if input_model.hasattr("regions"): # expected for oversampled data slice_det = input_model.regions else: slice_det = det2ab_transform.label_mapper.mapper y, x = (slice_det == i).nonzero() # getting pixel corner - ytop = y + 1 (routine fails for y = 1024) index = np.where(y < 1023) y = y[index] x = x[index] slice_ifu = i - start_region result = cube_internal_cal.match_det2cube( self.instrument, x, y, slice_ifu, input_model, det2ab_transform, self.xcoord, self.zcoord, self.crval1, self.crval3, self.cdelt1, self.cdelt3, self.naxis1, self.naxis2, ) spaxel_flux, spaxel_weight, spaxel_var, spaxel_iflux = result self.spaxel_flux = self.spaxel_flux + np.asarray( spaxel_flux, np.float64 ) self.spaxel_weight = self.spaxel_weight + np.asarray( spaxel_weight, np.float64 ) self.spaxel_var = self.spaxel_var + np.asarray(spaxel_var, np.float64) self.spaxel_iflux = self.spaxel_iflux + np.asarray( spaxel_iflux, np.float64 ) result = None del spaxel_flux, spaxel_weight, spaxel_var, spaxel_iflux, result # ---------------------------------------------------------------------------- # NIRSPEC # ---------------------------------------------------------------------------- if self.instrument == "NIRSPEC": nslices = 30 slicemap = [15, 14, 16, 13, 17, 12, 18, 11, 19, 10, 20, 9, 21, 8, 22, 7, 23, 6, 24, 5, 25, 4, 26, 3, 27, 2, 28, 1, 29, 0] # fmt: skip for i in range(nslices): slice_wcs = nirspec.nrs_wcs_set_input(input_model, i) x, y = wcstools.grid_from_bounding_box( slice_wcs.bounding_box, step=(1, 1), center=True ) detector2slicer = slice_wcs.get_transform(input_frame, "slicer") result = cube_internal_cal.match_det2cube( self.instrument, x, y, slicemap[i], input_model, detector2slicer, self.ycoord, self.zcoord, self.crval2, self.crval3, self.cdelt2, self.cdelt3, self.naxis1, self.naxis2, ) spaxel_flux, spaxel_weight, spaxel_var, spaxel_iflux = result self.spaxel_flux = self.spaxel_flux + np.asarray( spaxel_flux, np.float64 ) self.spaxel_weight = self.spaxel_weight + np.asarray( spaxel_weight, np.float64 ) self.spaxel_var = self.spaxel_var + np.asarray(spaxel_var, np.float64) self.spaxel_iflux = self.spaxel_iflux + np.asarray( spaxel_iflux, np.float64 ) result = None del spaxel_flux, spaxel_weight, spaxel_var, spaxel_iflux, result k = k + 1 # done looping over files self.find_spaxel_flux() self.set_final_dq_flags() # shove Flux and iflux in the final IFU cube result = self.setup_final_ifucube_model(input_model_ref) return result
# ________________________________________________________________________________
[docs] def build_ifucube_single(self): """ Build a set of single mode IFU cubes. Loop over every band contained in the IFU cube and read in the data associated with the band. Map each band to the output cube coordinate system. Returns ------- single_ifucube_container : `~stdatamodels.jwst.datamodels.IFUCubeModel` A single type IFU cube datamodel """ # loop over input models single_ifucube_container = ModelContainer() weight_type = 0 # default to emsm instead of msm if self.weighting == "msm": weight_type = 1 number_bands = len(self.list_par1) this_par1 = self.list_par1[0] # single IFUcube only have a single channel j = 0 for i in range(number_bands): this_par2 = self.list_par2[i] nfiles = len(self.master_table.FileMap[self.instrument][this_par1][this_par2]) # loop over the files that cover the spectral range the cube is for for k in range(nfiles): input_model = self.master_table.FileMap[self.instrument][this_par1][this_par2][k] self.input_models_this_cube.append(input_model) log.debug(f"Working on next Single IFU Cube = {j + 1}") # for each new data model create a new spaxel total_num = self.naxis1 * self.naxis2 * self.naxis3 self.spaxel_flux = np.zeros(total_num, dtype=np.float64) self.spaxel_weight = np.zeros(total_num, dtype=np.float64) self.spaxel_iflux = np.zeros(total_num) self.spaxel_dq = np.zeros(total_num, dtype=np.uint32) self.spaxel_var = np.zeros(total_num, dtype=np.float64) pixelresult = self.map_detector_to_outputframe(this_par1, input_model) ( coord1, coord2, corner_coord, wave, dwave, flux, err, slice_no, rois_pixel, roiw_pixel, weight_pixel, softrad_pixel, scalerad_pixel, _, _, ) = pixelresult build_cube = True # there is no valid data on the detector. Pixels are flagged as DO_NOT_USE. if wave is None: build_cube = False # The following values are not needed in cube_wrapper because the DQ plane # is not being filled in. flag_dq_plane = 0 start_region = 0 end_region = 0 roiw_ave = 0 if self.instrument == "MIRI": instrument = 0 else: # NIRSPEC instrument = 1 result = None if self.interpolation == "pointcloud" and build_cube: roiw_ave = np.mean(roiw_pixel) result = cube_wrapper( instrument, flag_dq_plane, weight_type, start_region, end_region, self.overlap_partial, self.overlap_full, self.xcoord, self.ycoord, self.zcoord, coord1, coord2, wave, flux, err, slice_no, rois_pixel, roiw_pixel, scalerad_pixel, weight_pixel, softrad_pixel, self.cdelt3_normal, roiw_ave, self.cdelt1, self.cdelt2, ) spaxel_flux, spaxel_weight, spaxel_var, spaxel_iflux, _ = result self.spaxel_flux = self.spaxel_flux + np.asarray(spaxel_flux, np.float64) self.spaxel_weight = self.spaxel_weight + np.asarray(spaxel_weight, np.float64) self.spaxel_var = self.spaxel_var + np.asarray(spaxel_var, np.float64) self.spaxel_iflux = self.spaxel_iflux + np.asarray(spaxel_iflux, np.float64) result = None del result, spaxel_flux, spaxel_var, spaxel_iflux if self.weighting == "drizzle" and build_cube: cdelt3_mean = np.nanmean(self.cdelt3_normal) xi1, eta1, xi2, eta2, xi3, eta3, xi4, eta4 = corner_coord linear = 0 if self.linear_wavelength: linear = 1 x_det = None y_det = None debug_cube = -1 result = cube_wrapper_driz( instrument, flag_dq_plane, start_region, end_region, self.overlap_partial, self.overlap_full, self.xcoord, self.ycoord, self.zcoord, coord1, coord2, wave, flux, err, slice_no, xi1, eta1, xi2, eta2, xi3, eta3, xi4, eta4, dwave, self.cdelt3_normal, self.cdelt1, self.cdelt2, cdelt3_mean, linear, x_det, y_det, debug_cube, ) spaxel_flux, spaxel_weight, spaxel_var, spaxel_iflux, _ = result self.spaxel_flux = self.spaxel_flux + np.asarray(spaxel_flux, np.float64) self.spaxel_weight = self.spaxel_weight + np.asarray(spaxel_weight, np.float64) self.spaxel_var = self.spaxel_var + np.asarray(spaxel_var, np.float64) self.spaxel_iflux = self.spaxel_iflux + np.asarray(spaxel_iflux, np.float64) result = None del result, spaxel_flux, spaxel_var, spaxel_iflux # shove Flux and iflux in the final ifucube self.find_spaxel_flux() # determine Cube Spaxel flux status = 0 result = self.setup_final_ifucube_model(input_model) ifucube_model, status = result single_ifucube_container.append(ifucube_model) if status != 0: log.debug("Possible problem with single ifu cube, no valid data in cube") j = j + 1 return single_ifucube_container
# ________________________________________________________________________________
[docs] def determine_cube_parameters_internal(self): """Determine the spatial and spectral IFU size for ``coord_system=internal_cal``.""" # internal_cal is for only 1 file and weighting= area # no msm or emsm information is needed par1 = self.list_par1[0] par2 = self.list_par2[0] a_scale, b_scale, w_scale = self.instrument_info.get_scale(par1, par2) self.spatial_size = a_scale if self.scalexy != 0: self.spatial_size = self.scalexy min_wave = self.instrument_info.get_wave_min(par1, par2) max_wave = self.instrument_info.get_wave_max(par1, par2) if self.wavemin is None: self.wavemin = min_wave else: self.wavemin = np.float64(self.wavemin) self.wavemin_user = True if self.wavemax is None: self.wavemax = max_wave else: self.wavemax = np.float64(self.wavemax) self.wavemax_user = True if self.scalew != 0: self.spectral_size = self.scalew self.linear_wavelength = True else: self.spectral_size = w_scale self.linear_wavelength = True
# ________________________________________________________________________________
[docs] def determine_cube_parameters(self): """ Determine the spatial and wavelength ROI size if IFU covers more than 1 band of data. If the IFU cube covers more than 1 band, then use the rules to define the spatial and wavelength ROI size to use for the cube. """ # initialize wave_roi = None weight_power = None number_bands = len(self.list_par1) spaxelsize = np.zeros(number_bands) spectralsize = np.zeros(number_bands) rois = np.zeros(number_bands) roiw = np.zeros(number_bands) power = np.zeros(number_bands) softrad = np.zeros(number_bands) scalerad = np.zeros(number_bands) minwave = np.zeros(number_bands) maxwave = np.zeros(number_bands) for i in range(number_bands): if self.instrument == "MIRI": par1 = self.list_par1[i] par2 = self.list_par2[i] elif self.instrument == "NIRSPEC": par1 = self.list_par1[i] par2 = self.list_par2[i] a_scale, b_scale, w_scale = self.instrument_info.get_scale(par1, par2) spaxelsize[i] = a_scale spectralsize[i] = w_scale minwave[i] = self.instrument_info.get_wave_min(par1, par2) maxwave[i] = self.instrument_info.get_wave_max(par1, par2) # pull out the values from the cube pars reference file roiw[i] = self.instrument_info.get_wave_roi(par1, par2) rois[i] = self.instrument_info.get_spatial_roi(par1, par2) power[i] = self.instrument_info.get_msm_power(par1, par2) softrad[i] = self.instrument_info.get_soft_rad(par1, par2) scalerad[i] = self.instrument_info.get_scale_rad(par1, par2) # Check the spatial size. If it is the same for the array set up the parameters all_same = np.all(spaxelsize == spaxelsize[0]) if all_same: self.spatial_size = spaxelsize[0] spatial_roi = rois[0] # if it is not the same then use the minimum value else: index_min = np.argmin(spaxelsize) self.spatial_size = spaxelsize[index_min] spatial_roi = rois[index_min] # find min and max wavelength min_wave = np.amin(minwave) max_wave = np.amax(maxwave) if self.wavemin is None: self.wavemin = min_wave else: self.wavemin = np.float64(self.wavemin) self.wavemin_user = True if self.wavemax is None: self.wavemax = max_wave else: self.wavemax = np.float64(self.wavemax) self.wavemax_user = True # now check spectral step - this will determine # if the wavelength dimension is linear or not all_same_spectral = np.all(spectralsize == spectralsize[0]) # check if scalew has been set - if yes then linear scale if self.scalew != 0: self.spectral_size = self.scalew self.linear_wavelength = True wave_roi = np.amin(roiw) weight_power = np.amin(power) self.soft_rad = np.amin(softrad) self.scalerad = np.amin(scalerad) # if we have NIRSPEC Prism then force wavelength to be non-linear elif self.instrument == "NIRSPEC" and not self.linear_wave: self.linear_wavelength = False # determine if have Prism, Medium or High resolution med = ["g140m", "g235m", "g395m"] high = ["g140h", "g235h", "g395h"] prism = ["prism"] for i in range(number_bands): par1 = self.list_par1[i] if par1 in prism: table = self.instrument_info.get_prism_table() if par1 in med: table = self.instrument_info.get_med_table() if par1 in high: table = self.instrument_info.get_high_table() ( table_wavelength, table_sroi, table_wroi, table_power, table_softrad, table_scalerad, ) = table # if all bands have the same spectral size then linear_wavelength elif all_same_spectral: self.spectral_size = spectralsize[0] wave_roi = roiw[0] weight_power = power[0] self.linear_wavelength = True # added this 10/01/19 self.soft_rad = softrad[0] self.scalerad = scalerad[0] else: self.linear_wavelength = False if self.instrument == "MIRI": table = self.instrument_info.get_multichannel_table() ( table_wavelength, table_sroi, table_wroi, table_power, table_softrad, table_scalerad, ) = table # getting NIRSPEC Table Values elif self.instrument == "NIRSPEC": # determine if have Prism, Medium or High resolution med = ["g140m", "g235m", "g395m"] high = ["g140h", "g235h", "g395h"] prism = ["prism"] for i in range(number_bands): par1 = self.list_par1[i] if par1 in prism: table = self.instrument_info.get_prism_table() if par1 in med: table = self.instrument_info.get_med_table() if par1 in high: table = self.instrument_info.get_high_table() ( table_wavelength, table_sroi, table_wroi, table_power, table_softrad, table_scalerad, ) = table # non-linear wavelength range. # Based on Min and Max wavelength - pull out the tables values that fall in this range. # Find the closest table entries to the self.wavemin and self.wavemax limits if not self.linear_wavelength: imin = (np.abs(table_wavelength - self.wavemin)).argmin() imax = (np.abs(table_wavelength - self.wavemax)).argmin() if imin > 1 and table_wavelength[imin] > self.wavemin: imin = imin - 1 if imax < (len(table_wavelength) - 1) and self.wavemax > table_wavelength[imax]: imax = imax + 1 num_table = imax - imin + 1 self.scalerad_table = np.zeros(num_table) self.scalerad_table[:] = table_scalerad[imin : imax + 1] self.softrad_table = np.zeros(num_table) self.softrad_table[:] = table_softrad[imin : imax + 1] self.roiw_table = np.zeros(num_table) self.roiw_table[:] = table_wroi[imin : imax + 1] self.rois_table = np.zeros(num_table) self.rois_table[:] = table_sroi[imin : imax + 1] if self.num_files < 4: self.rois_table = self.rois_table * 1.5 self.weight_power_table = np.zeros(num_table) self.weight_power_table[:] = table_power[imin : imax + 1] self.wavelength_table = np.zeros(num_table) self.wavelength_table[:] = table_wavelength[imin : imax + 1] # check if using default values from the table (not user set) if self.rois == 0.0: self.rois = spatial_roi # not set by user but determined from tables # default rois in tables is designed with a 4 dither pattern # increase rois if less than 4 file if self.output_type == "single" or self.num_files < 4: # We don't need to increase it if using 'emsm' weighting if self.weighting.lower() != "emsm": self.rois = self.rois * 1.5 log.info( "Increasing spatial region of interest " f"default value set for 4 dithers {self.rois}" ) # set wave_roi and weight_power to same values if they are in list if self.roiw == 0: self.roiw = wave_roi if self.weight_power == 0: self.weight_power = weight_power if self.scalexy != 0: self.spatial_size = self.scalexy # check on valid values found_error = False if self.linear_wavelength: # check we have valid data for key values if self.interpolation == "pointcloud": if np.isnan(self.rois): log.error("Spatial roi is nan, possible reference file value error") found_error = True if np.isnan(self.roiw): log.error("Spectral roi is nan, possible reference file value error") found_error = True if self.weighting == "msm": if np.isnan(self.weight_power): log.error("Weight power is nan, possible reference file value error") found_error = True if np.isnan(self.soft_rad): log.error("Soft rad is nan, possible reference file value error") found_error = True if self.weighting == "emsm": if np.isnan(self.scalerad): log.error("Scalerad is nan, possible reference file value error") found_error = True else: if np.isnan(self.wavelength_table).all(): log.error("Wavelength table contains all nans, possible reference file value error") found_error = True if self.interpolation == "pointcloud": if np.isnan(self.rois_table).all(): log.error( "Spatial roi table contains all nans, possible reference file value error" ) found_error = True if np.isnan(self.roiw_table).all(): log.error( "Spectral roi table contains all nans, possible reference file value error" ) found_error = True if self.weighting == "msm": if np.isnan(self.softrad_table).all(): log.error( "Soft rad table contains all nans, possible reference file value error" ) found_error = True if np.isnan(self.weight_power_table).all(): log.error( "Weight power table contains all nans, possible reference file value error" ) found_error = True if self.weighting == "emsm": if np.isnan(self.scalerad_table).all(): log.error( "Scalerad table contains all nans, possible reference file value error" ) found_error = True if found_error: raise IncorrectParameterError("An essential parameter is = nan, refer to error message") # catch where self.weight_power = nan weighting = msm written to header # TODO update writing to header scalerad if weighting = emsm if self.weight_power is not None: if np.isnan(self.weight_power): self.weight_power = None log.debug(f"spatial size {self.spatial_size}") log.debug(f"spectral size {self.spectral_size}") log.debug(f"spatial roi {self.rois}") log.debug(f"wave min and max {self.wavemin} {self.wavemax}") log.debug(f"linear wavelength {self.linear_wavelength}") log.debug(f"roiw {self.roiw}") log.debug(f"output_type {self.output_type}") if self.weighting == "msm": log.debug(f"weight_power {self.weight_power}") log.debug(f"softrad {self.soft_rad}") if self.weighting == "emsm": log.debug(f"scalerad {self.scalerad}")
# ________________________________________________________________________________
[docs] def setup_ifucube_wcs(self): """ Set up the WCS of the IFU cube. Loop over every datamodel contained in the cube and find the WCS of the output cube that contains all the data. Notes ----- If the coordinate system is ``internal_cal``, then min and max coordinates of along slice, across slice, and lambda (microns). For MIRI, the units along/across slice dimension are arcseconds. For NIRSPEC the units along/across slice dimension are meters. If the coordinate system is ``skyalign``/``ifualign``, then the min and max of RA (degrees), dec (degrees), and lambda (microns) are returned for internal calculations. """ self.cdelt1 = self.spatial_size self.cdelt2 = self.spatial_size if self.linear_wavelength: self.cdelt3 = self.spectral_size parameter1 = self.list_par1 parameter2 = self.list_par2 # Define the rotation angle # If coord_system = ifualign then the angle is between the ra-dec and alpha beta # coord system using the first input model. Use first file in first band to set up # rotation angle. # Compute the rotation angle between local IFU system and RA-DEC if self.coord_system == "ifualign": this_a = parameter1[0] # 0 is first band - this_a is channel this_b = parameter2[0] # 0 is first band - this_b is sub-channel log.info(f"Defining rotation between ra-dec and IFU plane using {this_a}, {this_b}") # first file for this band input_model = self.master_table.FileMap[self.instrument][this_a][this_b][0] input_frame = input_model.meta.wcs.available_frames[0] if self.instrument == "MIRI": xstart, xend = self.instrument_info.get_miri_slice_endpts(this_a) xc_start, xc_end = cube_build_wcs_util.miri_slice_limit_coords( input_model.meta.wcs, xstart, xend ) ysize = input_model.data.shape[0] y, x = np.mgrid[:ysize, xc_start:xc_end] detector2alpha_beta = input_model.meta.wcs.get_transform(input_frame, "alpha_beta") alpha, beta, lam = detector2alpha_beta(x, y) lam_med = np.nanmedian(lam) # pick two alpha, beta values to determine rotation angle # values in arc seconds alpha_beta2world = input_model.meta.wcs.get_transform( "alpha_beta", input_model.meta.wcs.output_frame.name ) temp_ra1, temp_dec1, lam_temp = alpha_beta2world(0, 0, lam_med) temp_ra2, temp_dec2, lam_temp = alpha_beta2world(2, 0, lam_med) elif self.instrument == "NIRSPEC": slice_wcs = nirspec.nrs_wcs_set_input(input_model, 0) x, y = wcstools.grid_from_bounding_box( slice_wcs.bounding_box, step=(1, 1), center=True ) detector2slicer = slice_wcs.get_transform(input_frame, "slicer") across, along, lam = detector2slicer(x, y) # lam ~0 for this transform lam_med = np.nanmedian(lam) # pick two along slice, across slice values to determine rotation angle # values in meters slicer2world = slice_wcs.get_transform("slicer", slice_wcs.output_frame.name) temp_ra1, temp_dec1, lam_temp = slicer2world(0, 0, lam_med) temp_ra2, temp_dec2, lam_temp = slicer2world(0, 0.005, lam_med) # temp_dec1 is in degrees dra, ddec = ( (temp_ra2 - temp_ra1) * np.cos(temp_dec1 * np.pi / 180.0), (temp_dec2 - temp_dec1), ) self.rot_angle = 90 + np.arctan2(dra, ddec) * 180.0 / np.pi log.info(f"Rotation angle between ifu and sky: {self.rot_angle}") # If coord_system = iskyalign and the user provided a position angle. # Define the rotation angle to be the user provided value. if self.coord_system == "skyalign" and self.cube_pa is not None: self.rot_angle = self.cube_pa log.info(f"Setting rotation angle between ifu and sky: {self.rot_angle}") # now loop over data and find min and max ranges data covers corner_a = [] corner_b = [] lambda_min = [] lambda_max = [] self.num_bands = len(self.list_par1) log.debug(f"Number of bands in cube: {self.num_bands}") for i in range(self.num_bands): this_a = parameter1[i] this_b = parameter2[i] log.debug(f"Working on data from {this_a}, {this_b}") n = len(self.master_table.FileMap[self.instrument][this_a][this_b]) log.debug(f"number of files, {n}") for k in range(n): lmin = 0.0 lmax = 0.0 input_model = self.master_table.FileMap[self.instrument][this_a][this_b][k] # If offsets are provided. Pull in ra and dec offsets. raoffset = 0.0 decoffset = 0.0 # pull out ra dec offset if it exists if self.offsets is not None: raoffset, decoffset = self.find_ra_dec_offset(input_model.meta.filename) # Find the footprint of the image spectral_found = hasattr(input_model.meta.wcsinfo, "spectral_region") spatial_found = hasattr(input_model.meta.wcsinfo, "s_region") world = False if self.coord_system == "skyalign" and self.cube_pa is None: world = True # Do not use the default spatial or spectral region found in the wcs if # 1. instrument is MIRI and # 2. Output type is not multi and (not default calspec2) and # 3. Channel is 1 or 3 - channel with smaller FOV on detector if self.instrument == "MIRI" and self.output_type != "multi": ch1 = "1" ch3 = "3" if ch1 in self.list_par1 or ch3 in self.list_par1: spatial_found = False spectral_found = False # If Moving Target data, then do not use S_REGION values. # The S_REGION values contain the footprint # on the sky of the original WCS. target_type = input_model.meta.target.type if target_type == "MOVING": spatial_found = False if spectral_found and spatial_found and world: [lmin, lmax] = input_model.meta.wcsinfo.spectral_region spatial_box = input_model.meta.wcsinfo.s_region s = spatial_box.split(" ") ca1 = float(s[3]) cb1 = float(s[4]) ca2 = float(s[5]) cb2 = float(s[6]) ca3 = float(s[7]) cb3 = float(s[8]) ca4 = float(s[9]) cb4 = float(s[10]) else: log.info("Mapping all pixels to output to determine IFU foot print") if self.instrument == "NIRSPEC": ch_corners = cube_build_wcs_util.find_corners_nirspec( input_model, self.coord_system ) ca1, cb1, ca2, cb2, ca3, cb3, ca4, cb4, lmin, lmax = ch_corners if self.instrument == "MIRI": ch_corners = cube_build_wcs_util.find_corners_miri( input_model, this_a, self.instrument_info, self.coord_system ) ca1, cb1, ca2, cb2, ca3, cb3, ca4, cb4, lmin, lmax = ch_corners # now append this model spatial and spectral corner if self.offsets is not None: ca1, cb1 = self.offset_coord(ca1, cb1, raoffset, decoffset) ca2, cb2 = self.offset_coord(ca2, cb2, raoffset, decoffset) ca3, cb3 = self.offset_coord(ca3, cb3, raoffset, decoffset) ca4, cb4 = self.offset_coord(ca4, cb4, raoffset, decoffset) corner_a.append(ca1) corner_a.append(ca2) corner_a.append(ca3) corner_a.append(ca4) corner_b.append(cb1) corner_b.append(cb2) corner_b.append(cb3) corner_b.append(cb4) lambda_min.append(lmin) lambda_max.append(lmax) # done looping over files determine final size of cube corner_a = np.array(corner_a) corner_a = wrap_ra(corner_a) final_a_min = min(corner_a) final_a_max = max(corner_a) final_b_min = min(corner_b) final_b_max = max(corner_b) log.debug(f"final a and b:{final_a_min, final_b_min, final_a_max, final_b_max}") log.debug(f" min and max wavelengths: {min(lambda_min), max(lambda_max)}") # the wavelength limits of cube are determined from 1. User or 2. cubepars # reference file (in the priority) final_lambda_min = self.wavemin final_lambda_max = self.wavemax log.debug(f" final min and max used in IFUcube: {final_lambda_min, final_lambda_max}") # ______________________________________________________________________ if self.instrument == "MIRI" and self.coord_system == "internal_cal": # we have a 1 to 1 mapping in y across slice dimension nslice = self.instrument_info.get_nslice(parameter1[0]) log.info(f"Across slice scale {self.cdelt2}") self.cdelt2 = (final_b_max - final_b_min) / nslice final_b_max = final_b_min + (nslice) * self.cdelt2 log.info( "Changed the across slice scale dimension so we have 1-1" " mapping between b and slice #" ) log.info(f"New across slice scale {self.cdelt2}") # ______________________________________________________________________ if self.instrument == "NIRSPEC" and self.coord_system == "internal_cal": # we have a 1 to 1 mapping in x - across slice dimension. nslice = 30 log.info(f"Across slice scale {self.cdelt1}") self.cdelt1 = (final_b_max - final_b_min) / nslice final_b_max = final_b_min + (nslice) * self.cdelt1 log.info( "Changed the across slice scale dimension so we have 1-1" " mapping between b and slice #" ) log.info(f"New across slice Scale {self.cdelt1}") self.cdelt2 = self.cdelt1 / 2.0 # Test that we have data (NIRSPEC NRS2 only has IFU data for 3 configurations) test_a = final_a_max - final_a_min test_b = final_b_max - final_b_min tolerance1 = 0.00001 if test_a < tolerance1 or test_b < tolerance1: log.info(f"No Valid IFU slice data found {test_a} {test_b}") # Based on Scaling and Min and Max values determine naxis1, naxis2, naxis3 # set cube CRVALs, CRPIXs if self.coord_system == "skyalign" or self.coord_system == "ifualign": self.set_geometry(corner_a, corner_b, final_lambda_min, final_lambda_max) else: self.set_geometry_slicer(corner_a, corner_b, final_lambda_min, final_lambda_max) self.print_cube_geometry()
# ________________________________________________________________________________
[docs] def map_detector_to_outputframe(self, this_par1, input_model): """ Loop over a file and map the detector pixels to the output cube. The output frame is on the sky (RA-Dec). Return the coordinates of all the detector pixel in the output frame. In addition, an array of pixel fluxes and weighing parameters are determined. The pixel flux and weighing parameters are used later in the process to find the final flux of a cube spaxel based on the pixel fluxes and pixel weighing parameters that fall within the ROI of spaxel center Parameters ---------- this_par1 : str For MIRI, this is the channel number (1, 2, 3, or 4); needed for MIRI to distinguish which channel on the detector we have. For NIRSPEC, this is the grating name. input_model : `~stdatamodels.jwst.datamodels.IFUImageModel` Input IFU image model to combine. Returns ------- coord1 : ndarray Coordinate for ``axis1`` in output cube for mapped pixel coord2 : ndarray Coordinate for ``axis2`` in output cube for mapped pixel wave : ndarray Wavelength associated with ```coord1, coord2`` flux : ndarray Flux associated with ``coord1, coord2`` err : ndarray Error associated with ``coord1, coord2`` rois_det : float Spatial ROI size to use roiw_det : ndarray Spectral ROI size associated with ``coord1,coord2`` weight_det : ndarray Weighting parameter associated with ``coord1,coord2`` softrad_det : ndarray Softrad parameter associated with ``coord1,coord2`` """ # initialize alpha_det and beta_det to None. These are filled in # if the instrument is MIRI and the weighting is miripsf wave_all = None slice_no_all = None # Slice number dwave_all = None corner_coord_all = None # initialize values to be returned to None dwave = None corner_coord = None coord1 = None coord2 = None wave = None flux = None err = None slice_no = None rois_det = None roiw_det = None weight_det = None softrad_det = None scalerad_det = None x_det = None y_det = None offsets = self.offsets if self.instrument == "MIRI": sky_result = self.map_miri_pixel_to_sky(input_model, this_par1, offsets) (x, y, ra, dec, wave_all, slice_no_all, dwave_all, corner_coord_all) = sky_result elif self.instrument == "NIRSPEC": sky_result = self.map_nirspec_pixel_to_sky(input_model, offsets) (x, y, ra, dec, wave_all, slice_no_all, dwave_all, corner_coord_all) = sky_result # ______________________________________________________________________________ # The following is for both MIRI and NIRSPEC flux_all = input_model.data[y, x] err_all = input_model.err[y, x] dq_all = input_model.dq[y, x] valid2 = np.isfinite(flux_all) x_all = x y_all = y # Pre-select only data within a given wavelength range # This range is defined to include all pixels for which the chosen wavelength region # of interest would have them fall within one of the cube spectral planes # Note that the cube lambda refer to the spaxel midpoints, so we must account for both # the spaxel width and the ROI size if self.linear_wavelength: min_wave_tolerance = self.zcoord[0] - self.roiw max_wave_tolerance = self.zcoord[-1] + self.roiw else: min_wave_tolerance = self.zcoord[0] - np.max(self.roiw_table) max_wave_tolerance = self.zcoord[-1] + np.max(self.roiw_table) if self.interpolation == "drizzle": dmax = np.nanmax(dwave_all) if self.linear_wavelength: min_wave_tolerance = self.zcoord[0] - (self.cdelt3 + dmax) max_wave_tolerance = self.zcoord[-1] + (self.cdelt3 + dmax) else: min_wave_tolerance = self.zcoord[0] - self.cdelt3_normal[0] max_wave_tolerance = self.zcoord[-1] + self.cdelt3_normal[-1] valid_min = np.where(wave_all >= min_wave_tolerance) not_mapped_low = wave_all.size - len(valid_min[0]) valid_max = np.where(wave_all <= max_wave_tolerance) not_mapped_high = wave_all.size - len(valid_max[0]) if not_mapped_low > 0: log.info( "# of detector pixels not mapped to output plane: " f"{not_mapped_low} with wavelength below {min_wave_tolerance}" ) if not_mapped_high > 0: log.info( "# of detector pixels not mapped to output plane: " f"{not_mapped_high} with wavelength above {max_wave_tolerance}" ) # using the DQFlags from the input_image find pixels that should be excluded # from the cube mapping valid3 = np.logical_and(wave_all >= min_wave_tolerance, wave_all <= max_wave_tolerance) # find the location of good data bad1 = np.bitwise_and(dq_all, dqflags.pixel["DO_NOT_USE"]).astype(bool) bad2 = np.bitwise_and(dq_all, dqflags.pixel["NON_SCIENCE"]).astype(bool) good_data = np.where(~bad1 & ~bad2 & valid2 & valid3) num_good = len(good_data[0]) if num_good == 0: # This can occur if all the pixels on the detector are marked DO_NOT_USE. log.warning(f"No valid pixels found on detector {input_model.meta.filename}") return ( coord1, coord2, corner_coord, wave, dwave, flux, err, slice_no, rois_det, roiw_det, weight_det, softrad_det, scalerad_det, x_det, y_det, ) # good data holds the location of pixels we want to map to cube # define variables as numpy arrays (numba needs this defined) flux_all_good = flux_all[good_data] good_shape = flux_all_good.shape flux = np.zeros(good_shape, dtype=np.float64) err = np.zeros(good_shape, dtype=np.float64) coord1 = np.zeros(good_shape, dtype=np.float64) coord2 = np.zeros(good_shape, dtype=np.float64) wave = np.zeros(good_shape, dtype=np.float64) slice_no = np.zeros(good_shape) flux[:] = flux_all_good err[:] = err_all[good_data] wave[:] = wave_all[good_data] slice_no[:] = slice_no_all[good_data] x_det = x_all[good_data] y_det = y_all[good_data] log.debug(f"After removing pixels min and max wave: {np.min(wave)} {np.max(wave)}") # based on the wavelength define the sroi, wroi, weight_power and # softrad to use in matching detector to spaxel values rois_det = np.zeros(wave.shape) roiw_det = np.zeros(wave.shape) weight_det = np.zeros(wave.shape) softrad_det = np.zeros(wave.shape) scalerad_det = np.zeros(wave.shape) # ________________________________________________________________________ # if working with msm or emsm need roi sizes and other parameters defined: if self.weighting == "msm" or self.weighting == "emsm": if self.linear_wavelength: rois_det[:] = self.rois roiw_det[:] = self.roiw weight_det[:] = self.weight_power softrad_det[:] = self.soft_rad scalerad_det[:] = self.scalerad else: # for each wavelength find the closest point in the self.wavelength_table for iw, w in enumerate(wave): self.find_closest_wave( iw, w, self.wavelength_table, self.rois_table, self.roiw_table, self.softrad_table, self.weight_power_table, self.scalerad_table, rois_det, roiw_det, softrad_det, weight_det, scalerad_det, ) ra_use = ra[good_data] dec_use = dec[good_data] coord1, coord2 = coord.radec2std(self.crval1, self.crval2, ra_use, dec_use, self.rot_angle) if self.interpolation == "drizzle": dwave = np.zeros(good_shape) dwave[:] = dwave_all[good_data] ra1 = corner_coord_all[0] dec1 = corner_coord_all[1] ra2 = corner_coord_all[2] dec2 = corner_coord_all[3] ra3 = corner_coord_all[4] dec3 = corner_coord_all[5] ra4 = corner_coord_all[6] dec4 = corner_coord_all[7] ra1 = ra1[good_data] dec1 = dec1[good_data] ra2 = ra2[good_data] dec2 = dec2[good_data] ra3 = ra3[good_data] dec3 = dec3[good_data] ra4 = ra4[good_data] dec4 = dec4[good_data] xi1, eta1 = coord.radec2std(self.crval1, self.crval2, ra1, dec1, self.rot_angle) xi2, eta2 = coord.radec2std(self.crval1, self.crval2, ra2, dec2, self.rot_angle) xi3, eta3 = coord.radec2std(self.crval1, self.crval2, ra3, dec3, self.rot_angle) xi4, eta4 = coord.radec2std(self.crval1, self.crval2, ra4, dec4, self.rot_angle) corner_coord = [xi1, eta1, xi2, eta2, xi3, eta3, xi4, eta4] return ( coord1, coord2, corner_coord, wave, dwave, flux, err, slice_no, rois_det, roiw_det, weight_det, softrad_det, scalerad_det, x_det, y_det, )
# ______________________________________________________________________
[docs] def map_miri_pixel_to_sky(self, input_model, this_par1, offsets): """ Loop over a MIRI model and map the detector pixels to the output cube. The output frame is on the sky (RA-Dec). Return the coordinates of all the detector pixel in the output frame for every valid input pixel from the IFU image model. Parameters ---------- input_model : `~stdatamodels.jwst.datamodels.IFUImageModel` Input IFU image model to combine this_par1 : str For MIRI, this is the channel number. needed for MIRI to distinguish which channel on the detector we have. For NIRSpec, this is the grating name. offsets : dict Optional dictionary of RA and Dec offsets to apply Returns ------- x, y : float The pixel values on the detector ra, dec : float Detector values mapped to sky wave : float Wavelength corresponding to pixel slice_no : int Slice number of the pixel dwave : float Delta wavelength covered by pixel corner_coord : tuple The corners of the pixel mapped to ``ra,dec`` """ wave = None slice_no = None # Slice number dwave = None corner_coord = None raoffset = 0.0 decoffset = 0.0 # pull out ra dec offset if it exists if offsets is not None: raoffset, decoffset = self.find_ra_dec_offset(input_model.meta.filename) log.info( "RA and Dec offset (arc seconds) applied to file :%8.6f, %8.6f, %s", raoffset.value, decoffset.value, input_model.meta.filename, ) # find the slice number of each pixel and fill in slice_det ysize, xsize = input_model.data.shape if input_model.hasattr("regions"): # expected for oversampled data slice_det = input_model.regions else: # find the slice number of each pixel and fill in slice_det slice_det = np.zeros((ysize, xsize), dtype=int) det2ab_transform = input_model.meta.wcs.get_transform("detector", "alpha_beta") mapper = det2ab_transform.label_mapper.mapper start_region = self.instrument_info.get_start_slice(this_par1) end_region = self.instrument_info.get_end_slice(this_par1) regions = list(range(start_region, end_region + 1)) for i in regions: ys, xs = (mapper == i).nonzero() xind = to_index(xs) yind = to_index(ys) xind = np.ndarray.flatten(xind) yind = np.ndarray.flatten(yind) slice_det[yind, xind] = i # define the x,y detector values of channel to be mapped to desired coordinate system xstart, xend = self.instrument_info.get_miri_slice_endpts(this_par1) xc_start, xc_end = cube_build_wcs_util.miri_slice_limit_coords( input_model.meta.wcs, xstart, xend ) y, x = np.mgrid[:ysize, xc_start:xc_end] y = np.reshape(y, y.size) x = np.reshape(x, x.size) # if self.coord_system == 'skyalign' or self.coord_system == 'ifualign': with warnings.catch_warnings(): warnings.filterwarnings("ignore", "invalid value", RuntimeWarning) ra, dec, wave = input_model.meta.wcs(x, y) # offset the central pixel if offsets is not None: ra, dec = self.offset_coord(ra, dec, raoffset, decoffset) valid1 = ~np.isnan(ra) ra = ra[valid1] dec = dec[valid1] wave = wave[valid1] x = x[valid1] y = y[valid1] xind = to_index(x) yind = to_index(y) xind = np.ndarray.flatten(xind) yind = np.ndarray.flatten(yind) slice_no = slice_det[yind, xind] input_frame = input_model.meta.wcs.available_frames[0] if self.interpolation == "drizzle": # Delta wavelengths _, _, wave1 = input_model.meta.wcs(x, y - 0.4999) _, _, wave2 = input_model.meta.wcs(x, y + 0.4999) dwave = np.abs(wave1 - wave2) # Pixel corners pixfrac = 1.0 alpha1, beta, _ = input_model.meta.wcs.transform( input_frame, "alpha_beta", x - 0.4999 * pixfrac, y ) alpha2, _, _ = input_model.meta.wcs.transform( input_frame, "alpha_beta", x + 0.4999 * pixfrac, y ) # Find slice width allbetaval = np.unique(beta) dbeta = np.abs(allbetaval[1] - allbetaval[0]) ra1, dec1, _ = input_model.meta.wcs.transform( "alpha_beta", input_model.meta.wcs.output_frame, alpha1, beta - dbeta * pixfrac / 2.0, wave, ) ra2, dec2, _ = input_model.meta.wcs.transform( "alpha_beta", input_model.meta.wcs.output_frame, alpha1, beta + dbeta * pixfrac / 2.0, wave, ) ra3, dec3, _ = input_model.meta.wcs.transform( "alpha_beta", input_model.meta.wcs.output_frame, alpha2, beta + dbeta * pixfrac / 2.0, wave, ) ra4, dec4, _ = input_model.meta.wcs.transform( "alpha_beta", input_model.meta.wcs.output_frame, alpha2, beta - dbeta * pixfrac / 2.0, wave, ) # now offset the pixel corners if offsets is not None: ra1, dec1 = self.offset_coord(ra1, dec1, raoffset, decoffset) ra2, dec2 = self.offset_coord(ra2, dec2, raoffset, decoffset) ra3, dec3 = self.offset_coord(ra3, dec3, raoffset, decoffset) ra4, dec4 = self.offset_coord(ra4, dec4, raoffset, decoffset) corner_coord = [ra1, dec1, ra2, dec2, ra3, dec3, ra4, dec4] sky_result = (x, y, ra, dec, wave, slice_no, dwave, corner_coord) return sky_result
# ______________________________________________________________________
[docs] def map_nirspec_pixel_to_sky(self, input_model, offsets): """ Loop over a NIRSpec model and map the detector pixels to the output cube. The output frame is on the sky (RA-Dec). Return the coordinates of all the detector pixel in the output frame for every valid input pixel from the IFU image model. Parameters ---------- input_model : `~stdatamodels.jwst.datamodels.IFUImageModel` Input IFU image model to combine offsets : ndarray RA and Dec offsets to apply to each file Returns ------- x, y : float The pixel values on the detector ra, dec : float Detector values mapped to sky wave : float Wavelength corresponding to pixel slice_no : int Slice number of the pixel dwave : float Delta wavelength covered by pixel corner_coord : tuple Rhe corners of the pixel mapped to ``ra,dec``` """ # check if we have an ra and dec offset file raoffset = 0.0 decoffset = 0.0 # pull out ra dec offset if it exists if offsets is not None: raoffset, decoffset = self.find_ra_dec_offset(input_model.meta.filename) log.info( "RA and Dec offset (arc seconds) applied to file :%8.6f, %8.6f, %s", raoffset.value, decoffset.value, input_model.meta.filename, ) # initialize the ra,dec, and wavelength arrays # we will loop over slice_nos and fill in values # the flag_det will be set when a slice_no pixel is filled in # at the end we will use this flag to pull out valid data ysize, xsize = input_model.data.shape ra_det = np.zeros((ysize, xsize)) dec_det = np.zeros((ysize, xsize)) lam_det = np.zeros((ysize, xsize)) flag_det = np.zeros((ysize, xsize)) slice_det = np.zeros((ysize, xsize), dtype=int) dwave_det = np.zeros((ysize, xsize)) ra1_det = np.zeros((ysize, xsize)) ra2_det = np.zeros((ysize, xsize)) ra3_det = np.zeros((ysize, xsize)) ra4_det = np.zeros((ysize, xsize)) dec1_det = np.zeros((ysize, xsize)) dec2_det = np.zeros((ysize, xsize)) dec3_det = np.zeros((ysize, xsize)) dec4_det = np.zeros((ysize, xsize)) # determine the slice width using slice 1 and 3 input_frame = input_model.meta.wcs.available_frames[0] slice_wcs1 = nirspec.nrs_wcs_set_input(input_model, 0) detector2slicer = slice_wcs1.get_transform(input_frame, "slicer") mean_x, mean_y = np.mean(slice_wcs1.bounding_box[0]), np.mean(slice_wcs1.bounding_box[1]) slice_loc1, _, _ = detector2slicer(mean_x, mean_y) slice_wcs3 = nirspec.nrs_wcs_set_input(input_model, 2) detector2slicer = slice_wcs3.get_transform(input_frame, "slicer") mean_x, mean_y = np.mean(slice_wcs3.bounding_box[0]), np.mean(slice_wcs3.bounding_box[1]) slice_loc3, _, _ = detector2slicer(mean_x, mean_y) across_width = abs(slice_loc1 - slice_loc3) # for NIRSPEC each file has 30 slices # wcs information access separately for each slice nslices = 30 log.info("Mapping each NIRSpec slice to sky for input file: %s", input_model.meta.filename) for ii in range(nslices): slice_wcs = nirspec.nrs_wcs_set_input(input_model, ii) x, y = wcstools.grid_from_bounding_box(slice_wcs.bounding_box) ra, dec, lam = slice_wcs(x, y) # the slices are curved on detector so a rectangular region returns NaNs valid = ~np.isnan(lam) x = x[valid] y = y[valid] ra = ra[valid] dec = dec[valid] lam = lam[valid] if self.interpolation == "drizzle": # Delta wavelengths _, _, wave1 = slice_wcs(x - 0.4999, y) _, _, wave2 = slice_wcs(x + 0.4999, y) dwave = np.abs(wave1 - wave2) # Pixel corners pixfrac = 1.0 detector2slicer = slice_wcs.get_transform(input_frame, "slicer") slicer2world = slice_wcs.get_transform("slicer", slice_wcs.output_frame) across1, along1, lam1 = detector2slicer(x, y - 0.49 * pixfrac) across2, along2, lam2 = detector2slicer(x, y + 0.49 * pixfrac) # Ensure that our ordering wraps around the footprint instead of crossing # footprint on a diagonal ra1, dec1, _ = slicer2world(across1 - across_width * pixfrac / 2, along1, lam1) ra2, dec2, _ = slicer2world(across1 + across_width * pixfrac / 2, along1, lam1) ra3, dec3, _ = slicer2world(across2 + across_width * pixfrac / 2, along2, lam2) ra4, dec4, _ = slicer2world(across2 - across_width * pixfrac / 2, along2, lam2) # near the slice boundaries the corners can become Nan - do not use pixels with # Nan corners valid1 = np.logical_and(~np.isnan(ra1), ~np.isnan(ra2)) valid2 = np.logical_and(~np.isnan(ra3), ~np.isnan(ra4)) final = np.where(np.logical_and(valid1, valid2)) x = x[final] y = y[final] ra = ra[final] dec = dec[final] lam = lam[final] ra1 = ra1[final] dec1 = dec1[final] ra2 = ra2[final] dec2 = dec2[final] ra3 = ra3[final] dec3 = dec3[final] ra4 = ra4[final] dec4 = dec4[final] dwave = dwave[final] xind = to_index(x) yind = to_index(y) xind = np.ndarray.flatten(xind) yind = np.ndarray.flatten(yind) ra = np.ndarray.flatten(ra) dec = np.ndarray.flatten(dec) lam = np.ndarray.flatten(lam) ra_det[yind, xind] = ra dec_det[yind, xind] = dec lam_det[yind, xind] = lam flag_det[yind, xind] = 1 slice_det[yind, xind] = ii + 1 # fill in corner values dwave_det[yind, xind] = dwave ra1_det[yind, xind] = ra1 ra2_det[yind, xind] = ra2 ra3_det[yind, xind] = ra3 ra4_det[yind, xind] = ra4 dec1_det[yind, xind] = dec1 dec2_det[yind, xind] = dec2 dec3_det[yind, xind] = dec3 dec4_det[yind, xind] = dec4 else: # not drizzling xind = to_index(x) yind = to_index(y) xind = np.ndarray.flatten(xind) yind = np.ndarray.flatten(yind) ra = np.ndarray.flatten(ra) dec = np.ndarray.flatten(dec) lam = np.ndarray.flatten(lam) ra_det[yind, xind] = ra dec_det[yind, xind] = dec lam_det[yind, xind] = lam flag_det[yind, xind] = 1 slice_det[yind, xind] = ii + 1 # after looping over slices - pull out valid values valid_data = np.where(flag_det == 1) y, x = valid_data wave = lam_det[valid_data] slice_no = slice_det[valid_data] dwave = dwave_det[valid_data] ra = ra_det[valid_data] dec = dec_det[valid_data] ra1 = ra1_det[valid_data] ra2 = ra2_det[valid_data] ra3 = ra3_det[valid_data] ra4 = ra4_det[valid_data] dec1 = dec1_det[valid_data] dec2 = dec2_det[valid_data] dec3 = dec3_det[valid_data] dec4 = dec4_det[valid_data] if offsets is not None: # central pixel ra, dec = self.offset_coord(ra, dec, raoffset, decoffset) # pixel corners ra1, dec1 = self.offset_coord(ra1, dec1, raoffset, decoffset) ra2, dec2 = self.offset_coord(ra2, dec2, raoffset, decoffset) ra3, dec3 = self.offset_coord(ra3, dec3, raoffset, decoffset) ra4, dec4 = self.offset_coord(ra4, dec4, raoffset, decoffset) corner_coord = [ra1, dec1, ra2, dec2, ra3, dec3, ra4, dec4] sky_result = (x, y, ra, dec, wave, slice_no, dwave, corner_coord) return sky_result
# ________________________________________________________________________________
[docs] def find_closest_wave( self, iw, w, wavelength_table, rois_table, roiw_table, softrad_table, weight_power_table, scalerad_table, rois_det, roiw_det, softrad_det, weight_det, scalerad_det, ): """ Given a specific wavelength, find the closest value in the ``wavelength_table``. Parameters ---------- iw : int Index of wavelength array. w : float Wavelength array of data. wavelength_table : ndarray Wavelength array read from :ref:`cubepar_reffile`. rois_table : ndarray ``rois`` array read from :ref:`cubepar_reffile`. roiw_table : ndarray ``roiw`` array read from :ref:`cubepar_reffile`. softrad_table : ndarray Softrad array read from :ref:`cubepar_reffile`. weight_power_table : ndarray Weight power array read from :ref:`cubepar_reffile`. scalerad_table : ndarray Scalerad array read from :ref:`cubepar_reffile`. rois_det : ndarray ``rois`` array of detector pixel for the associated wavelength of the pixel. roiw_det : ndarray ``roiw`` array of detector pixel for the associated wavelength of the pixel. softrad_det : ndarray Softrad array of detector pixel for the associated wavelength of the pixel. weight_det : ndarray Weight array of detector pixel for the associated wavelength of the pixel. scalerad_det : ndarray Scalerad array of detector pixel for the associated wavelength of the pixel. """ ifound = (np.abs(wavelength_table - w)).argmin() rois_det[iw] = rois_table[ifound] roiw_det[iw] = roiw_table[ifound] softrad_det[iw] = softrad_table[ifound] weight_det[iw] = weight_power_table[ifound] scalerad_det[iw] = scalerad_table[ifound]
# ________________________________________________________________________________
[docs] def find_spaxel_flux(self): """Depending on the interpolation method, find the flux for each spaxel value.""" # currently these are the same but in the future there could be a difference in # how the spaxel flux is determined according to self.interpolation. if self.interpolation == "area": good = self.spaxel_iflux > 0 self.spaxel_flux[good] = self.spaxel_flux[good] / self.spaxel_weight[good] self.spaxel_var[good] = self.spaxel_var[good] / ( self.spaxel_weight[good] * self.spaxel_weight[good] ) elif self.interpolation == "pointcloud" or self.interpolation == "drizzle": # Don't apply any normalization if no points contributed to a spaxel # (i.e., don't divide by zero) good = self.spaxel_iflux > 0 # Normalize the weighted sum of pixel fluxes by the sum of the weights self.spaxel_flux[good] = self.spaxel_flux[good] / self.spaxel_weight[good] # Normalize the variance by the square of the weights self.spaxel_var[good] = self.spaxel_var[good] / ( self.spaxel_weight[good] * self.spaxel_weight[good] )
# ________________________________________________________________________________
[docs] def set_final_dq_flags(self): """ Set up the final DQ flags. These flags include: * Good data (0) * NON_SCIENCE * DO_NOT_USE. """ # An initial set of dq flags was set in overlap_fov_with_spaxel or # overlap_slice_with_spaxel. The initial dq dlags are defined in ifu_cube # class: # self.overlap_partial = 4 # intermediate flag # self.overlap_full = 2 # intermediate flag # self.overlap_hole = dqflags.pixel['DO_NOT_USE'] # self.overlap_no_coverage = dqflags.pixel['NON_SCIENCE'] (also bitwise and with # dqflags.pixel['DO_NOT_USE'] ) # compare the weight plane and spaxel_dq. The initial spaxel_dq flagging # has too small a FOV in NIRSpec line mapping case. # flatten to match the size of spaxel_weight self.spaxel_dq = np.ndarray.flatten(self.spaxel_dq) # the fov is an underestimate. Check the spaxel_weight plane # if weight map > 0 then set spaxel_dq to overlap_partial under_data = self.spaxel_weight > 0 self.spaxel_dq[under_data] = self.overlap_partial # convert all remaining spaxel_dq of 0 to NON_SCIENCE + DO_NOT_USE # these pixel should have no overlap with the data non_science = self.spaxel_dq == 0 self.spaxel_dq[non_science] = np.bitwise_or( self.overlap_no_coverage, dqflags.pixel["DO_NOT_USE"] ) # refine where good data should be ind_full = np.where(np.bitwise_and(self.spaxel_dq, self.overlap_full)) ind_partial = np.where(np.bitwise_and(self.spaxel_dq, self.overlap_partial)) self.spaxel_dq[ind_full] = 0 self.spaxel_dq[ind_partial] = 0 location_holes = np.where((self.spaxel_dq == 0) & (self.spaxel_weight == 0)) self.spaxel_dq[location_holes] = self.overlap_hole # one last check. Remove pixels flagged as hole but have 1 adjacent spaxel # that has no coverage (NON_SCIENCE). If NON_SCIENCE flag is next to pixel # flagged as hole then set the Hole flag to NON_SCIENCE spaxel_dq_temp = self.spaxel_dq nxy = self.naxis1 * self.naxis2 index = np.where(self.spaxel_dq == self.overlap_hole) for i in range(len(index[0])): iwave = int(index[0][i] / nxy) rem = index[0][i] - iwave * nxy yrem = int(rem / self.naxis1) xrem = rem - yrem * self.naxis1 found = 0 ij = 0 # do not allow holes to occur at the edge of IFU cube if yrem == 0 or yrem == (self.naxis2 - 1) or xrem == 0 or xrem == (self.naxis1 - 1): spaxel_dq_temp[index[0][i]] = np.bitwise_or( self.overlap_no_coverage, dqflags.pixel["DO_NOT_USE"] ) found = 1 # flag as NON_SCIENCE instead of hole if left, right, top, bottom pixel # is NON_SCIENCE xcheck = np.zeros(4, dtype=int) ycheck = np.zeros(4, dtype=int) # left xcheck[0] = xrem - 1 ycheck[0] = yrem # right xcheck[1] = xrem + 1 ycheck[1] = yrem # bottom xcheck[2] = xrem ycheck[2] = yrem - 1 # top xcheck[3] = xrem ycheck[3] = yrem + 1 while (ij < 4) and (found == 0): if ( xcheck[ij] > 0 and xcheck[ij] < self.naxis1 and ycheck[ij] > 0 and ycheck[ij] < self.naxis2 ): index_check = iwave * nxy + ycheck[ij] * self.naxis1 + xcheck[ij] # If the nearby spaxel_dq contains overlap_no_coverage # then unmark dq flag as hole. A hole has to have nearby # pixels all in FOV. check = ( np.bitwise_and(self.spaxel_dq[index_check], self.overlap_no_coverage) == self.overlap_no_coverage ) if check: spaxel_dq_temp[index[0][i]] = np.bitwise_or( self.overlap_no_coverage, dqflags.pixel["DO_NOT_USE"] ) found = 1 ij = ij + 1 self.spaxel_dq = spaxel_dq_temp location_holes = np.where(self.spaxel_dq == self.overlap_hole) ave_holes = len(location_holes[0]) / self.naxis3 if ave_holes < 1: log.info("Average # of holes/wavelength plane is < 1") else: log.info("Average # of holes/wavelength plane: %i", ave_holes) log.info("Total # of holes for IFU cube is : %i", len(location_holes[0]))
# ________________________________________________________________________________
[docs] def setup_final_ifucube_model(self, model_ref): """ Set up the final meta WCS info of IFU cube along with other FITS keywords. Parameters ---------- model_ref : `~stdatamodels.jwst.datamodels.IFUImageModel` The first IFU image model to use to fill in basic header values. Returns ------- result : `~stdatamodels.jwst.datamodels.IFUCubeModel` IFU cube datamodel with data arrays filled in. """ status = 0 # loop over the wavelength planes to confirm each plane has some data # for initial or final planes that do not have any data - eliminated them # from the IFUcube # Rearrange values from 1d vectors into 3d cubes flux = self.spaxel_flux.reshape((self.naxis3, self.naxis2, self.naxis1)) wmap = self.spaxel_iflux.reshape((self.naxis3, self.naxis2, self.naxis1)) var = self.spaxel_var.reshape((self.naxis3, self.naxis2, self.naxis1)) dq = self.spaxel_dq.reshape((self.naxis3, self.naxis2, self.naxis1)) # For MIRI MRS, apply a quality cut to help fix spectral tearing at the ends of # each band. This is largely taken care of by the WCS regions file, but there # will still be 1-2 possibly problematic planes at the end of each band in # multi-band cubes. Do this by looking for how many good spaxels there are at # each wavelength and finding outliers from the trend. if self.instrument == "MIRI": nz = flux.shape[0] # Create a vector of the number of good spaxels at each wavelength ngood = np.zeros(nz) for zz in range(0, nz): dqvec = dq[zz, :, :].ravel() good = np.where(dqvec == 0) ngood[zz] = len(good[0]) # Find where this vector is non-zero, and compute 1% threshold of those good values good = np.where(ngood > 0) if len(good[0]) > 0: pctile = np.percentile(ngood[good], 3) # Figure out where the number of good values were less than 75% of threshold, # and zero out those arrays. lowcov = (np.where((ngood > 0) & (ngood < 0.75 * pctile)))[0] nlowcov = len(lowcov) log.info("Number of spectral tear planes adjusted: %i", nlowcov) for zz in range(0, nlowcov): flux[lowcov[zz], :, :] = 0 wmap[lowcov[zz], :, :] = 0 var[lowcov[zz], :, :] = 0 dq[lowcov[zz], :, :] = ( dqflags.pixel["DO_NOT_USE"] + dqflags.pixel["NON_SCIENCE"] ) # Set np.nan values wherever the DO_NOT_USE flag is set dnu = np.where((dq & dqflags.pixel["DO_NOT_USE"]) != 0) flux[dnu] = np.nan var[dnu] = np.nan var = np.sqrt(var) if self.linear_wavelength: crval3 = self.crval3 cdelt3 = self.cdelt3 crpix3 = self.crpix3 pixels = np.arange(self.naxis3) # Calculate wavelengths # We add 1 to 'pixels' to convert 0-based Python indexing to 1-based FITS indexing wavelength_table = crval3 + (pixels + 1 - crpix3) * cdelt3 wave = np.asarray(wavelength_table, dtype=np.float32) num = len(wave) alldata = np.array([(wave[None].T,)], dtype=[("wavelength", "<f4", (num, 1))]) # always write the wavetable ifucube_model = datamodels.IFUCubeModel( data=flux, dq=dq, err=var, weightmap=wmap, wavetable=alldata ) else: wave = np.asarray(self.wavelength_table, dtype=np.float32) num = len(wave) alldata = np.array([(wave[None].T,)], dtype=[("wavelength", "<f4", (num, 1))]) ifucube_model = datamodels.IFUCubeModel( data=flux, dq=dq, err=var, weightmap=wmap, wavetable=alldata ) ifucube_model.update(model_ref) ifucube_model.meta.filename = self.output_name # Call model_blender if there are multiple inputs if len(self.input_models) > 1: saved_model_type = ifucube_model.meta.model_type self.blend_output_metadata(ifucube_model) # Reset to original ifucube_model.meta.model_type = saved_model_type # ______________________________________________________________________ # fill in Channel for MIRI if self.instrument == "MIRI": # fill in Channel output meta data num_ch = len(self.list_par1) outchannel = self.list_par1[0] for m in range(1, num_ch): outchannel = outchannel + str(self.list_par1[m]) outchannel = "".join(set(outchannel)) outchannel = "".join(sorted(outchannel)) ifucube_model.meta.instrument.channel = outchannel # single files are created for a single band, if self.output_type == "single": with datamodels.open(model_ref) as input_ref: # define the cubename for each single filename = input_ref.meta.filename indx = filename.rfind(".fits") self.output_name_base = filename[:indx] self.output_file = None newname = self.define_cubename() ifucube_model.meta.filename = newname if self.instrument == "MIRI": outchannel = self.list_par1[0] outband = self.list_par2[0] ifucube_model.meta.instrument.channel = outchannel ifucube_model.meta.instrument.band = outband.upper() else: outgrating = self.list_par1[0] ifucube_model.meta.instrument.grating = outgrating.upper() ifucube_model.meta.wcsinfo.crval1 = self.crval1 ifucube_model.meta.wcsinfo.crval2 = self.crval2 ifucube_model.meta.wcsinfo.crpix1 = self.crpix1 ifucube_model.meta.wcsinfo.crpix2 = self.crpix2 ifucube_model.meta.wcsinfo.cdelt1 = self.cdelt1 / 3600.0 ifucube_model.meta.wcsinfo.cdelt2 = self.cdelt2 / 3600.0 # Now that we've got a pixel scale, set photometric area keywords ifucube_model.meta.photometry.pixelarea_arcsecsq = self.cdelt1 * self.cdelt2 ifucube_model.meta.photometry.pixelarea_steradians = ( ifucube_model.meta.photometry.pixelarea_arcsecsq * 2.3504e-11 ) if self.linear_wavelength: ifucube_model.meta.wcsinfo.crval3 = self.crval3 ifucube_model.meta.wcsinfo.cdelt3 = self.cdelt3 ifucube_model.meta.wcsinfo.ctype3 = "WAVE" ifucube_model.meta.wcsinfo.crpix3 = self.crpix3 ifucube_model.meta.ifu.roi_spatial = float(self.rois) ifucube_model.meta.ifu.roi_wave = float(self.roiw) # even though we are writing a WAVE-TAB we # do not want to set these parameters: # ctype3="WAVE-TAB", # ps3_0 = 'WCS-TABLE' # ps3_1 = 'wavelength' # because some viewers (e.g. DS9) report an incorrect wavelength range else: ifucube_model.meta.wcsinfo.ctype3 = "WAVE-TAB" ifucube_model.meta.wcsinfo.ps3_0 = "WCS-TABLE" ifucube_model.meta.wcsinfo.ps3_1 = "wavelength" ifucube_model.meta.wcsinfo.crval3 = 1.0 ifucube_model.meta.wcsinfo.crpix3 = 1.0 ifucube_model.meta.wcsinfo.cdelt3 = None ifucube_model.meta.ifu.roi_wave = np.mean(self.roiw_table) ifucube_model.wavedim = f"(1,{num:d})" ifucube_model.meta.wcsinfo.ctype1 = "RA---TAN" ifucube_model.meta.wcsinfo.ctype2 = "DEC--TAN" ifucube_model.meta.wcsinfo.cunit1 = "deg" ifucube_model.meta.wcsinfo.cunit2 = "deg" ifucube_model.meta.wcsinfo.cunit3 = "um" ifucube_model.meta.wcsinfo.wcsaxes = 3 ifucube_model.meta.wcsinfo.pc1_1 = -1 ifucube_model.meta.wcsinfo.pc1_2 = 0 ifucube_model.meta.wcsinfo.pc1_3 = 0 ifucube_model.meta.wcsinfo.pc2_1 = 0 ifucube_model.meta.wcsinfo.pc2_2 = 1 ifucube_model.meta.wcsinfo.pc2_3 = 0 ifucube_model.meta.wcsinfo.pc3_1 = 0 ifucube_model.meta.wcsinfo.pc3_2 = 0 ifucube_model.meta.wcsinfo.pc3_3 = 1 if self.rot_angle is None: self.rot_angle = 0.0 ifucube_model.meta.wcsinfo.pc1_1 = -np.cos(self.rot_angle * np.pi / 180.0) ifucube_model.meta.wcsinfo.pc1_2 = np.sin(self.rot_angle * np.pi / 180.0) ifucube_model.meta.wcsinfo.pc2_1 = np.sin(self.rot_angle * np.pi / 180.0) ifucube_model.meta.wcsinfo.pc2_2 = np.cos(self.rot_angle * np.pi / 180.0) ifucube_model.meta.ifu.flux_extension = "SCI" ifucube_model.meta.ifu.error_extension = "ERR" ifucube_model.meta.ifu.error_type = "ERR" ifucube_model.meta.ifu.dq_extension = "DQ" ifucube_model.meta.ifu.weighting = str(self.weighting) # weight_power is needed for single cubes. Linear Wavelengths # if non-linear wavelengths then this will be None ifucube_model.meta.ifu.weight_power = self.weight_power with datamodels.open(model_ref) as input_ref: ifucube_model.meta.bunit_data = input_ref.meta.bunit_data ifucube_model.meta.bunit_err = input_ref.meta.bunit_err if self.interpolation == "drizzle": # stick in values of 0, otherwise it is NaN and # fits file can not be written because these # values are defined in ifucube.schema.yaml ifucube_model.meta.ifu.weight_power = 0 ifucube_model.meta.ifu.roi_wave = 0 ifucube_model.meta.ifu.roi_spatial = 0 ifucube_model.meta.ifu.weighting = str(self.interpolation) if self.coord_system == "internal_cal": # stick in values of 0, otherwise it is NaN and # fits file can not be written because these # values are defined in ifucube.schema.yaml ifucube_model.meta.ifu.weight_power = 0 ifucube_model.meta.ifu.roi_wave = 0 ifucube_model.meta.ifu.roi_spatial = 0 # un-correct cdelt for degree conversion ifucube_model.meta.wcsinfo.cdelt1 *= 3600.0 ifucube_model.meta.wcsinfo.cdelt2 *= 3600.0 # correct "RA" axis orientation ifucube_model.meta.wcsinfo.pc1_1 *= -1.0 if self.instrument == "MIRI": ifucube_model.meta.wcsinfo.cunit1 = "arcsec" ifucube_model.meta.wcsinfo.cunit2 = "arcsec" ifucube_model.meta.wcsinfo.ctype1 = "MRSALPHA" ifucube_model.meta.wcsinfo.ctype2 = "MRSBETA" if self.instrument == "NIRSPEC": ifucube_model.meta.wcsinfo.cunit1 = "meter" ifucube_model.meta.wcsinfo.cunit2 = "meter" ifucube_model.meta.wcsinfo.ctype1 = "NRSSLICERX" ifucube_model.meta.wcsinfo.ctype2 = "NRSSLICERY" # set WCS information wcsobj = pointing.create_fitswcs(ifucube_model) ifucube_model.meta.wcs = wcsobj ifucube_model.meta.wcs.bounding_box = ( (0, self.naxis1 - 1), (0, self.naxis2 - 1), (0, self.naxis3 - 1), ) ifucube_model.meta.cal_step.cube_build = "COMPLETE" # problem with cube_build - contains only 0 data if status == 1: ifucube_model.meta.cal_step.cube_build = "SKIPPED" result = (ifucube_model, status) return result
# ________________________________________________________________________________
[docs] def blend_output_metadata(self, ifu_cube): """ Create new output metadata based on blending all input metadata. Parameters ---------- ifu_cube : `~stdatamodels.jwst.datamodels.IFUCubeModel` IFU cube data model """ blendmeta.blendmodels( ifu_cube, self.input_models_this_cube, ignore=[ "meta.filename", ], ) # For moving targets, set RA, Dec equal to the average mt_avra = getattr(self.input_models_this_cube[0].meta.wcsinfo, "mt_avra", None) mt_avdec = getattr(self.input_models_this_cube[0].meta.wcsinfo, "mt_avdec", None) if mt_avra is not None: ifu_cube.meta.wcsinfo.mt_ra = mt_avra ifu_cube.meta.wcsinfo.mt_dec = mt_avdec ifu_cube.meta.target.ra = mt_avra ifu_cube.meta.target.dec = mt_avdec
# ________________________________________________________________________________
[docs] def find_ra_dec_offset(self, filename): """ For the given filename find the RA and Dec offset to apply. Parameters ---------- filename : str Filename that holds the RA and Dec offset to apply Returns ------- raoffset : float Right ascension offset value read in from file decoffset ; float Declination offset valuet read in from file """ index = self.offsets["filename"].index(filename) raoffset = self.offsets["raoffset"][index] decoffset = self.offsets["decoffset"][index] return raoffset, decoffset
# ________________________________________________________________________________
[docs] def offset_coord(self, ra, dec, raoffset, decoffset): """ Given a RA, Dec, RA offset, and Dec offset, use `~astropy.coordinates.SkyCoord` to apply the offsets. Parameters ---------- ra : float Right ascension coordinate to offset dec : float Declination coordinate to offset raoffset : float Right ascension offset to apply decoffset : float Declination offset to apply Returns ------- raw_new : float Right ascension coordinate with offset applied dec_new : float Declination coordinate with offset applied """ # noqa: E501 coord = SkyCoord(ra, dec, unit="deg") coord_new = coord.spherical_offsets_by(raoffset, decoffset) ra_new = coord_new.ra.value dec_new = coord_new.dec.value return ra_new, dec_new
[docs] class IncorrectInputError(Exception): """Interpolation=area when more than 1 file is used to build cube.""" pass
[docs] class IncorrectParameterError(Exception): """Cube building parameter is NaN.""" pass