import logging
import numpy as np
from stdatamodels.jwst import datamodels
log = logging.getLogger(__name__)
__all__ = [
"is_subarray",
"ref_matches_sci",
"get_subarray_model",
"get_multistripe_subarray_model",
"stripe_read",
"generate_stripe_array",
"science_detector_frame_transform",
"detector_science_frame_transform",
"MatchRowError",
"find_row",
]
[docs]
def is_subarray(input_model):
"""
Check if a data model comes from a subarray readout.
If the data dimensions are less than 2048x2048 (or 1032x1024
for MIRI), it is assumed to be a subarray.
Parameters
----------
input_model : `~stdatamodels.jwst.datamodels.JwstDataModel`
Input data model to be checked.
Returns
-------
status : bool
`True` if the primary data array of the model is smaller than
full-frame, `False` otherwise.
"""
# Get the dimensions of the model's primary data array
nrows = input_model.data.shape[-2]
ncols = input_model.data.shape[-1]
# Compare the dimensions to a full-frame
if input_model.meta.instrument.name.upper() == "MIRI":
if ncols < 1032 or nrows < 1024:
log.debug("Input exposure is a subarray readout")
return True
else:
return False
else:
if ncols < 2048 or nrows < 2048:
log.debug("Input exposure is a subarray readout")
return True
else:
return False
[docs]
def ref_matches_sci(sci_model, ref_model):
"""
Match science model to reference model.
Check to see if the science model has the same subarray
characteristics as the reference model. Also performs error
checking on the subarray keywords in both the reference file
and science data.
Parameters
----------
sci_model : `~stdatamodels.jwst.datamodels.JwstDataModel`
Science data model.
ref_model : `~stdatamodels.jwst.datamodels.JwstDataModel`
Reference file data model.
Returns
-------
status : bool
`True` if the science model has the same subarray parameters
as the reference model, `False` otherwise.
"""
# Get the reference file subarray parameters
xstart_ref = ref_model.meta.subarray.xstart
xsize_ref = ref_model.meta.subarray.xsize
ystart_ref = ref_model.meta.subarray.ystart
ysize_ref = ref_model.meta.subarray.ysize
# Make sure the attributes were populated
if (xstart_ref is None) or (xsize_ref is None) or (ystart_ref is None) or (ysize_ref is None):
# If the ref file is full-frame, set the missing params to
# default values
if ref_model.meta.instrument.name.upper() == "MIRI":
if ref_model.data.shape[-1] == 1032 and ref_model.data.shape[-2] == 1024:
log.warning("Missing subarray corner/size keywords in reference file")
log.warning("Setting them to full-frame default values")
xstart_ref = 1
ystart_ref = 1
xsize_ref = 1032
ysize_ref = 1024
ref_model.meta.subarray.xstart = xstart_ref
ref_model.meta.subarray.ystart = ystart_ref
ref_model.meta.subarray.xsize = xsize_ref
ref_model.meta.subarray.ysize = ysize_ref
else:
log.error("Missing subarray corner/size keywords in reference file")
raise ValueError("Can't determine ref file subarray properties")
else:
if ref_model.data.shape[-1] == 2048 and ref_model.data.shape[-2] == 2048:
log.warning("Missing subarray corner/size keywords in reference file")
log.warning("Setting them to full-frame default values")
xstart_ref = 1
ystart_ref = 1
xsize_ref = 2048
ysize_ref = 2048
ref_model.meta.subarray.xstart = xstart_ref
ref_model.meta.subarray.ystart = ystart_ref
ref_model.meta.subarray.xsize = xsize_ref
ref_model.meta.subarray.ysize = ysize_ref
else:
log.error("Missing subarray corner/size keywords in reference file")
raise ValueError("Can't determine ref file subarray properties")
log.debug(
"ref substrt1=%d, subsize1=%d, substrt2=%d, subsize2=%d",
xstart_ref,
xsize_ref,
ystart_ref,
ysize_ref,
)
# Make sure the starting corners are valid
if xstart_ref < 1 or ystart_ref < 1:
log.error("Reference file subarray corners are invalid")
raise ValueError("Bad subarray corners")
# Make sure the size attributes are consistent with data array
if hasattr(ref_model, "coeffs"):
xsize_data = ref_model.coeffs.shape[-1]
ysize_data = ref_model.coeffs.shape[-2]
else:
xsize_data = ref_model.shape[-1]
ysize_data = ref_model.shape[-2]
# Check x dimensions
if xsize_ref != xsize_data:
log.warning("Reference file data array size doesn't match SUBSIZE1")
log.warning("Using actual array size")
xsize_ref = xsize_data
# Check y dimensions
if ysize_ref != ysize_data:
# NIRSpec IRS2 is a special mode, where it's allowed to have
# a mismatch in the y-size.
if ysize_ref == 2048 and ysize_data == 3200:
pass
else:
log.warning("Reference file data array size doesn't match SUBSIZE2")
log.warning("Using actual array size")
ysize_ref = ysize_data
# Get the science model subarray parameters
try:
# If the science data model has already gone through
# extract_2d, the metadata is in the top level of the model
xstart_sci = sci_model.xstart
xsize_sci = sci_model.xsize
ystart_sci = sci_model.ystart
ysize_sci = sci_model.ysize
except AttributeError:
# Otherwise the metadata is in the meta tree
xstart_sci = sci_model.meta.subarray.xstart
xsize_sci = sci_model.meta.subarray.xsize
ystart_sci = sci_model.meta.subarray.ystart
ysize_sci = sci_model.meta.subarray.ysize
# Make sure the attributes were populated
if (xstart_sci is None) or (xsize_sci is None) or (ystart_sci is None) or (ysize_sci is None):
log.error("Missing subarray corner/size keywords in science file")
raise ValueError("Can't determine science file subarray properties")
else:
log.debug(
"sci substrt1=%d, subsize1=%d, substrt2=%d, subsize2=%d",
xstart_sci,
xsize_sci,
ystart_sci,
ysize_sci,
)
# Make sure the starting corners are valid
if xstart_sci < 1 or ystart_sci < 1:
log.error("Science file subarray corners are invalid")
raise ValueError("Bad subarray corners")
# Make sure the size attributes are consistent with data array.
# Check x dimensions
if xsize_sci != sci_model.shape[-1]:
log.warning("Science file data array size doesn't match SUBSIZE1")
log.warning("Using actual array size")
xsize_sci = sci_model.shape[-1]
# Check y dimensions
if ysize_sci != sci_model.shape[-2]:
# NIRSpec IRS2 is a special mode, where it's allowed to have
# a mismatch in the y-size.
if ysize_sci == 2048 and sci_model.shape[-2] == 3200:
pass
else:
log.warning("Science file data array size doesn't match SUBSIZE2")
log.warning("Using actual array size")
ysize_sci = sci_model.shape[-2]
# Finally, see if all of the science file subarray params match those
# of the reference file
if (
xstart_ref == xstart_sci
and xsize_ref == xsize_sci
and ystart_ref == ystart_sci
and ysize_ref == ysize_sci
):
return True
else:
return False
[docs]
def get_subarray_model(sci_model, ref_model):
"""
Get subarray model.
Create a subarray version of a reference file model that matches
the subarray characteristics of a science data model. A new
model is created that contains subarrays of all data arrays
contained in the reference file model.
Parameters
----------
sci_model : `~stdatamodels.jwst.datamodels.JwstDataModel`
Science data model.
ref_model : `~stdatamodels.jwst.datamodels.JwstDataModel`
Reference file data model.
Returns
-------
sub_model : `~stdatamodels.jwst.datamodels.JwstDataModel`
Subarray version of the reference file model.
"""
# If science data is in multistripe readout, use
# multistripe-specific subarray reconstruction.
if not isinstance(sci_model, datamodels.JwstDataModel):
raise TypeError("Science model must be a JWST data model.")
if not isinstance(ref_model, datamodels.JwstDataModel):
raise TypeError("Reference model must be a JWST data model.")
if getattr(sci_model.meta.subarray, "multistripe_reads1", None) is not None:
return get_multistripe_subarray_model(sci_model, ref_model)
# Get the science model subarray params
xstart_sci = sci_model.meta.subarray.xstart
xsize_sci = sci_model.meta.subarray.xsize
ystart_sci = sci_model.meta.subarray.ystart
ysize_sci = sci_model.meta.subarray.ysize
# Get the reference model subarray params
xstart_ref = ref_model.meta.subarray.xstart
ystart_ref = ref_model.meta.subarray.ystart
xsize_ref = ref_model.meta.subarray.xsize
ysize_ref = ref_model.meta.subarray.ysize
# Compute the slice indexes, in 0-indexed python frame
xstart = xstart_sci - xstart_ref
ystart = ystart_sci - ystart_ref
xstop = xstart + xsize_sci
ystop = ystart + ysize_sci
log.debug("slice xstart=%d, xstop=%d, ystart=%d, ystop=%d", xstart, xstop, ystart, ystop)
# Make sure that the slice limits are within the bounds of
# the reference file data array
if (
xstart < 0
or ystart < 0
or xstop > ref_model.meta.subarray.xsize
or ystop > ref_model.meta.subarray.ysize
):
log.error(
"Computed reference file slice indexes are incompatible with "
"size of reference data array"
)
log.error(
"Science: SUBSTRT1=%d, SUBSTRT2=%d, SUBSIZE1=%d, SUBSIZE2=%d",
xstart_sci,
ystart_sci,
xsize_sci,
ysize_sci,
)
log.error(
"Reference: SUBSTRT1=%d, SUBSTRT2=%d, SUBSIZE1=%d, SUBSIZE2=%d",
xstart_ref,
ystart_ref,
xsize_ref,
ysize_ref,
)
log.error(
"Slice indexes: xstart=%d, xstop=%d, ystart=%d, ystop=%d", xstart, xstop, ystart, ystop
)
raise ValueError("Bad reference file slice indexes")
# Extract subarrays from each data attribute in the particular
# type of reference file model and return a new copy of the
# data model
model_type = ref_model.__class__
sub_model = model_type()
primary = ref_model.get_primary_array_name()
for attr in {primary, "err", "dq"}:
if ref_model.hasattr(attr):
sub_data = getattr(ref_model, attr)[..., ystart:ystop, xstart:xstop]
setattr(sub_model, attr, sub_data)
sub_model.update(ref_model)
return sub_model
[docs]
def get_multistripe_subarray_model(sci_model, ref_model):
"""
Create a multistripe subarray cutout of a reference file.
Use the multistripe subarray characteristics of a science
data model to generate a new reference file datamodel,
containing subarray cutouts of all relevant data arrays
contained in the reference file model.
Parameters
----------
sci_model : JWST data model
The science data model.
ref_model : JWST data model
The reference file data model.
Returns
-------
sub_model : JWST data model
Subarray cutout reference file model.
"""
if isinstance(ref_model, datamodels.MaskModel):
sub_model = stripe_read(sci_model, ref_model, ["dq"])
elif isinstance(ref_model, datamodels.GainModel):
sub_model = stripe_read(sci_model, ref_model, ["data"])
elif isinstance(ref_model, datamodels.LinearityModel):
sub_model = stripe_read(sci_model, ref_model, ["coeffs", "dq"])
elif isinstance(ref_model, datamodels.ReadnoiseModel):
sub_model = stripe_read(sci_model, ref_model, ["data"])
elif isinstance(ref_model, datamodels.SaturationModel):
sub_model = stripe_read(sci_model, ref_model, ["data", "dq"])
elif isinstance(ref_model, datamodels.SuperBiasModel):
sub_model = stripe_read(sci_model, ref_model, ["data", "err", "dq"])
else:
log.warning("Unsupported reference file model type for multistripe subarray cutouts.")
sub_model = None
return sub_model
[docs]
def stripe_read(sci_model, ref_model, attribs):
"""
Generate sub-model from science model multistripe params.
Parameters
----------
sci_model, ref_model : DataModel
Science and reference models, respectively.
attribs : list of str
Attributes in the model to process.
Returns
-------
sub_model : DataModel
Generated sub-model.
"""
# Get the science model multistripe params
sci_meta = sci_model.meta
num_superstripe = getattr(sci_meta.subarray, "num_superstripe", 0)
# We need to extract the number of science integrations in this file, which is tangled up with
# - the number of integrations in the exposure
# - the number of superstripes each science integration may be divided between
# Substripe data may have 0 for num_superstripe, so we use 1 as the integration divisor
int_divisor = max(num_superstripe, 1)
sci_nints = sci_model.data.shape[0] // int_divisor
# Get the reference model subarray params
if num_superstripe > 0:
sub_model = datamodels.ReferenceFileModel()
else:
sub_model = type(ref_model)()
sub_model.update(ref_model)
for attrib in attribs:
ref_array = getattr(ref_model, attrib)
# Apply subarray shape in fastaxis; slowaxis cutouts determined in generate_stripe_array
if np.abs(sci_meta.subarray.fastaxis) == 1:
faststart_sci = sci_model.meta.subarray.xstart
fastsize_sci = sci_meta.subarray.xsize
# Get the reference model subarray params
faststart_ref = ref_model.meta.subarray.xstart
# Compute the slice indexes, in 0-indexed python frame
faststart = faststart_sci - faststart_ref
faststop = faststart + fastsize_sci
ref_array = ref_array[..., faststart:faststop]
else:
faststart_sci = sci_model.meta.subarray.ystart
fastsize_sci = sci_meta.subarray.ysize
# Get the reference model subarray params
faststart_ref = ref_model.meta.subarray.ystart
# Compute the slice indexes, in 0-indexed python frame
faststart = faststart_sci - faststart_ref
faststop = faststart + fastsize_sci
ref_array = ref_array[..., faststart:faststop, :]
sub_model[attrib] = generate_stripe_array(ref_array, sci_meta, sci_nints)
return sub_model
[docs]
def generate_stripe_array(ref_array, sci_meta, sci_nints):
"""
Generate stripe array.
Parameters
----------
ref_array : np.array
The scene to be sliced.
sci_meta : `~stdatamodels.properties.ObjectNode`
The science datamodel metadata tree.
sci_nints : int
The number of science integrations in the science datamodel. Not equivalent to nints when
the science exposure is segmented.
Returns
-------
stripe_out : ndarray
Generated stripe array.
"""
# Extract science metadata
nreads1 = sci_meta.subarray.multistripe_reads1
nskips1 = sci_meta.subarray.multistripe_skips1
nreads2 = sci_meta.subarray.multistripe_reads2
nskips2 = sci_meta.subarray.multistripe_skips2
repeat_stripe = sci_meta.subarray.repeat_stripe
interleave_reads1 = sci_meta.subarray.interleave_reads1
superstripe_step = sci_meta.subarray.superstripe_step
num_superstripe = sci_meta.subarray.num_superstripe
xsize_sci = sci_meta.subarray.xsize
ysize_sci = sci_meta.subarray.ysize
fastaxis = sci_meta.subarray.fastaxis
slowaxis = sci_meta.subarray.slowaxis
ngroups = sci_meta.exposure.ngroups
# Transform science data to detector frame
ref_array = science_detector_frame_transform(ref_array, fastaxis, slowaxis)
ref_shape = ref_array.shape
if np.abs(fastaxis) == 1:
slow_size = ysize_sci
fast_size = xsize_sci
else:
slow_size = xsize_sci
fast_size = ysize_sci
if num_superstripe == 0:
# SUBSTRIPE MODE
stripe_out = np.zeros((*ref_shape[:-2], slow_size, fast_size), dtype=ref_array.dtype)
# Track the read position in the full frame with linecount, and number of lines
# read into subarray with sub_lines
linecount = 0
sub_lines = 0
# Start at 0, make nreads1 row reads
stripe_out[..., sub_lines : sub_lines + nreads1, :] = ref_array[
..., linecount : linecount + nreads1, :
]
linecount += nreads1
sub_lines += nreads1
# Now skip nskips1
linecount += nskips1
# Nreads2
stripe_out[..., sub_lines : sub_lines + nreads2, :] = ref_array[
..., linecount : linecount + nreads2, :
]
linecount += nreads2
sub_lines += nreads2
# Now, while the output size is less than the science array size:
# 1a. If repeat_stripe, reset linecount (HEAD) to initial position
# after every nreads2.
# 1b. Else, do nskips2 followed by nreads2 until subarray complete.
# 2. Following 1a., repeat sequence of nreads1, skips*, nreads2
# until complete. For skips*:
# 3a. If interleave_reads1, value of skips increments by nreads2 +
# nskips2 for each stripe read.
# 3b. If not interleave, each loop after linecount reset is simply
# nreads1 + nskips1 + nreads2.
interleave_skips = nskips1
if nreads2 <= 0:
raise ValueError(
"Invalid value for multistripe_reads2 - "
"cutout for reference file could not be "
"generated!"
)
while sub_lines < slow_size:
# If repeat_stripe, add interleaved rows to output and increment sub_lines
if repeat_stripe > 0:
linecount = 0
stripe_out[..., sub_lines : sub_lines + nreads1, :] = ref_array[
..., linecount : linecount + nreads1, :
]
linecount += nreads1
sub_lines += nreads1
if interleave_reads1:
interleave_skips += nskips2 + nreads2
linecount += interleave_skips
else:
linecount += nskips1
else:
linecount += nskips2
stripe_out[..., sub_lines : sub_lines + nreads2, :] = ref_array[
..., linecount : linecount + nreads2, :
]
linecount += nreads2
sub_lines += nreads2
if sub_lines != slow_size:
raise ValueError(
"Stripe readout resulted in mismatched reference array shape "
"with respect to science array!"
)
else:
# SUPERSTRIPE MODE
# First alter subarray shape to broadcast stripe size to fill "full" subarray
ref_native_dims = len(ref_shape)
if len(ref_shape) == 2:
ref_array = ref_array[np.newaxis, np.newaxis, :].repeat(ngroups, axis=1)
ref_shape = ref_array.shape
if len(ref_shape) == 4:
ref_nints, _, ysize, xsize = ref_shape
else:
raise ValueError(f"Unsupported shape: len(ref_shape) == {len(ref_array.shape)}")
# If reference file has more integrations than science, only process through
# necessary integrations.
# If science has more integrations, we'll broadcast the reference arrays to
# match the number of arrays in the science frame.
nints = min(ref_nints, sci_nints)
stripe_out = np.zeros(
(nints * num_superstripe, ngroups, slow_size, fast_size), dtype=ref_array.dtype
)
for integ in range(nints):
for stripe in range(num_superstripe):
# Track the read position in the full frame with linecount, and number of lines
# read into subarray with sub_lines
linecount = 0
sub_lines = 0
# Start at 0, make nreads1 row reads
stripe_out[
integ * num_superstripe + stripe, :, sub_lines : sub_lines + nreads1, :
] = ref_array[integ, :, linecount : linecount + nreads1, :]
linecount += nreads1
sub_lines += nreads1
# Now skip nskips1 + superstripe_step * stripe
linecount += nskips1 + superstripe_step * stripe
# Nreads2
stripe_out[
integ * num_superstripe + stripe, :, sub_lines : sub_lines + nreads2, :
] = ref_array[integ, :, linecount : linecount + nreads2, :]
linecount += nreads2
sub_lines += nreads2
# Now, while the output size is less than the science array size:
# 1a. If repeat_stripe, reset linecount (HEAD) to initial position
# after every nreads2.
# 1b. Else, do nskips2 followed by nreads2 until subarray complete.
# 2. Following 1a., repeat sequence of nreads1, skips*, nreads2
# until complete. For skips*:
# 3a. If interleave_reads1, value of skips increments by nreads2 +
# nskips2 for each stripe read.
# 3b. If not interleave, each loop after linecount reset is simply
# nreads1 + nskips1 + nreads2.
interleave_skips = nskips1
while sub_lines < slow_size:
# If repeat_stripe, add interleaved rows to output and increment sub_lines
if repeat_stripe > 0:
linecount = 0
stripe_out[..., sub_lines : sub_lines + nreads1, :] = ref_array[
..., linecount : linecount + nreads1, :
]
linecount += nreads1
sub_lines += nreads1
if interleave_reads1:
interleave_skips += nskips2 + nreads2
linecount += interleave_skips
else:
linecount += nskips1
else:
linecount += nskips2
stripe_out[..., sub_lines : sub_lines + nreads2, :] = ref_array[
..., linecount : linecount + nreads2, :
]
linecount += nreads2
sub_lines += nreads2
if sub_lines != slow_size:
raise ValueError(
"Stripe readout resulted in mismatched reference array shape "
"with respect to science array!"
)
# If multistripe but ref array is typically 2-D:
# rather than broadcast a 2-D array into many wasted dims, just provide the minimal
# 3-D array, with one slice per superstripe in the third dimension. Expect steps
# to handle the extra dimension.
if ref_native_dims == 2:
stripe_out = stripe_out[:, 0, :, :]
# If multistripe and output currently has one plane per stripe only,
# and reference file is expected to have 4 dimensions,
# broadcast arrays into sci_nints copies so that direct application
# of the reference array to the science array is possible
elif stripe_out.shape[0] == num_superstripe:
stripe_out = np.tile(stripe_out, reps=(sci_nints, 1, 1, 1))
# Transform from detector frame back to science frame
stripe_out = detector_science_frame_transform(stripe_out, fastaxis, slowaxis)
return stripe_out
[docs]
class MatchRowError(Exception):
"""Raised when more than one row is matched in a FITS table or list of dict."""
def __init__(self, message):
if message is None:
message = "Expected to match one row only."
super().__init__(message)
[docs]
def find_row(ldict, match_keys):
"""
Find a row in a FITS table matching fields.
Parameters
----------
ldict : list of dict
A list of dictionaries, The dictionaries may have any number
of items but must include all keys in ``match_keys``.
match_keys : dict
``{key: value}`` pairs are matched against all items in ``ldict``
to find the dict which matches them.
Returns
-------
row : int or None
FITS table row index, None if no match.
Raises
------
Warning
When a field name is not in the table.
MatchFitsTableRowError
When more than one rows match.
Examples
--------
>>> ldict = [
... {"row_offset": 2.1, "col_offset": 1.3, "filter": "F444W", "pupil": "CLEAR"},
... {"row_offset": 1, "col_offset": 3, "filter": "F277W", "pupil": "FLAT"},
... ]
>>> match_keys = {"filter": "F444W", "pupil": "CLEAR"}
>>> result = find_row(ldict, match_keys)
>>> print(result)
{'row_offset': 2.1, 'col_offset': 1.3, 'filter': 'F444W', 'pupil': 'CLEAR'}
"""
def _normalize_strings(field):
if isinstance(field[0], str):
return np.array([s.upper() for s in field])
return field
# item[1] is always converted to upper case in the `DataSet` initializer.
results = []
for d in ldict:
row = [d[key] == match_keys[key] for key in match_keys]
if all(row):
results.append(d)
if len(results) > 1:
raise MatchRowError(f"Expected to find one matching row in table, found {len(results)}.")
if len(results) == 0:
log.warning("Expected to find one matching row in table, found 0.")
return None
return results[0]