"""The ``tweakreg_catalog`` module provides functions for generating catalogs of sources from images.""" # noqa: E501
import inspect
import logging
import warnings
import astropy.units as u
import numpy as np
import photutils
from astropy.convolution import Gaussian2DKernel, convolve
from astropy.stats import SigmaClip, gaussian_fwhm_to_sigma
from astropy.table import Table
from astropy.utils import lazyproperty, minversion
from astropy.utils.exceptions import AstropyUserWarning
from photutils.background import Background2D, MedianBackground
from photutils.detection import DAOStarFinder, IRAFStarFinder
from photutils.segmentation import SourceCatalog, SourceFinder
from photutils.segmentation.catalog import DEFAULT_COLUMNS
from photutils.utils import NoDetectionsWarning
from stdatamodels.jwst.datamodels import ImageModel, dqflags
log = logging.getLogger(__name__)
__all__ = ["make_tweakreg_catalog"]
PHOTUTILS_GE_3 = minversion(photutils, "2.3.1.dev")
SOURCECAT_COLUMNS = DEFAULT_COLUMNS + [
"ellipticity",
"sky_bbox_ll",
"sky_bbox_ul",
"sky_bbox_lr",
"sky_bbox_ur",
]
class JWSTBackground:
"""
Class to estimate a 2D background and background RMS noise in an image.
Parameters
----------
data : ndarray
The input 2D image for which to estimate the background.
box_size : int or array-like (int)
The box size along each axis. If ``box_size`` is a scalar then
a square box of size ``box_size`` will be used. If ``box_size``
has two elements, they should be in ``(ny, nx)`` order.
coverage_mask : array-like (bool), optional
A boolean mask, with the same shape as ``data``, where a `True`
value indicates the corresponding element of ``data`` is masked.
Masked data are excluded from calculations. ``coverage_mask``
should be `True` where there is no coverage (i.e., no data) for
a given pixel (e.g., blank areas in a mosaic image). It should
not be used for bad pixels.
"""
def __init__(self, data, box_size=100, coverage_mask=None):
self.data = data
self.box_size = np.asarray(box_size).astype(int) # must be integer
self.coverage_mask = coverage_mask
@lazyproperty
def _background2d(self):
"""
Estimate the 2D background and background RMS noise in an image.
Returns
-------
background : `photutils.background.Background2D`
A Background2D object containing the 2D background and
background RMS noise estimates.
"""
sigma_clip = SigmaClip(sigma=3.0)
bkg_estimator = MedianBackground()
filter_size = (3, 3)
# All data have NaNs. Suppress warnings about them.
with warnings.catch_warnings():
warnings.filterwarnings(action="ignore", category=AstropyUserWarning)
try:
bkg = Background2D(
self.data,
self.box_size,
filter_size=filter_size,
coverage_mask=self.coverage_mask,
sigma_clip=sigma_clip,
bkg_estimator=bkg_estimator,
)
except ValueError:
# use the entire unmasked array
bkg = Background2D(
self.data,
self.data.shape,
filter_size=filter_size,
coverage_mask=self.coverage_mask,
sigma_clip=sigma_clip,
bkg_estimator=bkg_estimator,
exclude_percentile=100.0,
)
log.info(
"Background could not be estimated in meshes. "
"Using the entire unmasked array for background "
f"estimation: bkg_boxsize={self.data.shape}."
)
return bkg
@lazyproperty
def background(self):
"""
Compute the 2-D background if it has not been computed yet, then return it.
Returns
-------
background : ndarray
The 2D background image.
"""
return self._background2d.background
@lazyproperty
def background_rms(self):
"""
Compute the 2-D background RMS image if it has not been computed yet, then return it.
Returns
-------
background_rms : ndarray
The 2D background RMS image.
"""
return self._background2d.background_rms
def _convolve_data(data, kernel_fwhm, mask=None):
"""
Convolve the data with a Gaussian2D kernel.
Parameters
----------
data : ndarray
The 2D array to convolve.
kernel_fwhm : float
The full-width at half-maximum (FWHM) of the 2D Gaussian kernel.
mask : array-like, bool, optional
A boolean mask with the same shape as ``data``, where a `True`
value indicates the corresponding element of ``data`` is masked.
Returns
-------
convolved_data : ndarray
The convolved 2D array.
"""
sigma = kernel_fwhm * gaussian_fwhm_to_sigma
kernel = Gaussian2DKernel(sigma)
# All data have NaNs. Suppress warnings about them.
with warnings.catch_warnings():
warnings.filterwarnings(action="ignore", category=AstropyUserWarning)
return convolve(data, kernel, mask=mask)
def _rename_columns(sources):
"""
Rename catalog columns and add astropy units to be consistent between the three star finders.
Table is modified in place.
Parameters
----------
sources : `~astropy.table.QTable`
The source catalog table for which to rename columns.
"""
rename_map = {
"pa": "orientation", # for iraf and dao star finder compatibility with source_catalog step
"npix": "area", # for iraf and dao star finder compatibility with source_catalog step
"segment_flux": "flux", # for sourcefinder compatibility with tweakreg step
"label": "id", # for sourcefinder compatibility with tweakreg step
}
for old_col, new_col in rename_map.items():
if old_col in sources.colnames:
sources.rename_column(old_col, new_col)
units_map = {"orientation": u.deg}
for col, unit in units_map.items():
if col in sources.colnames:
sources[col] = u.Quantity(sources[col], unit=unit)
def _sourcefinder_wrapper(data, threshold_img, kernel_fwhm, mask=None, **kwargs):
"""
Make input and output of SourceFinder consistent with IRAFStarFinder and DAOStarFinder.
Wrapper function for photutils.source_finder.SourceFinder to make input
and output consistent with DAOStarFinder and IRAFStarFinder.
Parameters
----------
data : array-like
The 2D array of the image.
threshold_img : ndarray
The per-pixel absolute image value above which to select sources.
kernel_fwhm : float
The full-width at half-maximum (FWHM) of the 2D Gaussian kernel.
mask : array-like (bool), optional
The image mask
**kwargs : dict
Additional keyword arguments passed to `photutils.segmentation.SourceFinder`
and/or `photutils.segmentation.SourceCatalog`.
Returns
-------
sources : `~astropy.table.QTable`
A table containing the found sources.
References
----------
:ref:`photutils segmentation tutorial <photutils:image_segmentation>`.
"""
default_kwargs = {
"npixels": 10,
"progress_bar": False,
}
kwargs = {**default_kwargs, **kwargs}
# convolve the data with a Gaussian kernel
if kernel_fwhm > 0:
conv_data = _convolve_data(data, kernel_fwhm, mask=mask)
else:
conv_data = data
# handle passing kwargs into SourceFinder and SourceCatalog
# note that this suppresses TypeError: unexpected keyword arguments
# so user must be careful to know which kwargs are passed in here
finder_args = list(inspect.signature(SourceFinder).parameters)
catalog_args = list(inspect.signature(SourceCatalog).parameters)
finder_dict = {k: kwargs.pop(k) for k in dict(kwargs) if k in finder_args}
catalog_dict = {k: kwargs.pop(k) for k in dict(kwargs) if k in catalog_args}
if ("kron_params" in catalog_dict.keys()) and (catalog_dict["kron_params"] is None):
# necessary because cannot specify default in Step spec string
catalog_dict["kron_params"] = (2.5, 1.4, 0.0)
finder = SourceFinder(**finder_dict)
segment_map = finder(conv_data, threshold_img, mask=mask)
if segment_map is None:
return None, None
sources = SourceCatalog(
data,
segment_map,
mask=mask,
convolved_data=conv_data,
**catalog_dict,
).to_table(columns=SOURCECAT_COLUMNS)
return sources, segment_map
def _translate_starfinder_kwargs(
kwargs,
kernel_fwhm,
sharplo_default,
sharphi_default,
roundlo_default,
roundhi_default,
):
"""
Translate starfinder keyword arguments for cross-version compatibility.
For photutils >= 3.0, old-style keyword arguments are translated
to their new equivalents:
- ``sharplo``, ``sharphi`` -> ``sharpness_range``
- ``roundlo``, ``roundhi`` -> ``roundness_range``
- ``peakmax`` -> ``peak_max``
- ``brightest`` -> ``n_brightest``
- ``minsep_fwhm`` -> ``min_separation``
For photutils < 3.0, new-style keyword arguments are translated
to their old equivalents:
- ``sharpness_range`` -> ``sharplo``, ``sharphi``
- ``roundness_range`` -> ``roundlo``, ``roundhi``
- ``peak_max`` -> ``peakmax``
- ``n_brightest`` -> ``brightest``
If both old-style and new-style keyword arguments are provided
for the same parameter, the style matching the installed photutils
version takes precedence.
Parameters
----------
kwargs : dict
The keyword arguments to translate.
kernel_fwhm : float
The FWHM of the Gaussian kernel, used for ``minsep_fwhm``
conversion to ``min_separation`` in pixels.
sharplo_default : float
Default lower bound for the sharpness range.
sharphi_default : float
Default upper bound for the sharpness range.
roundlo_default : float
Default lower bound for the roundness range.
roundhi_default : float
Default upper bound for the roundness range.
Returns
-------
kwargs : dict
A copy of the input kwargs with keywords translated to match
the installed photutils version.
"""
kwargs = kwargs.copy() # avoid modifying original dict
if PHOTUTILS_GE_3:
# Translate old-style to new-style. If new-style kwargs are
# already present, they take precedence.
if "sharpness_range" not in kwargs:
sharplo = kwargs.pop("sharplo", sharplo_default)
sharphi = kwargs.pop("sharphi", sharphi_default)
kwargs["sharpness_range"] = (sharplo, sharphi)
else:
kwargs.pop("sharplo", None)
kwargs.pop("sharphi", None)
if "roundness_range" not in kwargs:
roundlo = kwargs.pop("roundlo", roundlo_default)
roundhi = kwargs.pop("roundhi", roundhi_default)
kwargs["roundness_range"] = (roundlo, roundhi)
else:
kwargs.pop("roundlo", None)
kwargs.pop("roundhi", None)
if "peak_max" not in kwargs and "peakmax" in kwargs:
kwargs["peak_max"] = kwargs.pop("peakmax")
else:
kwargs.pop("peakmax", None)
if "n_brightest" not in kwargs and "brightest" in kwargs:
kwargs["n_brightest"] = kwargs.pop("brightest")
else:
kwargs.pop("brightest", None)
if "min_separation" not in kwargs and "minsep_fwhm" in kwargs:
minsep = kwargs.pop("minsep_fwhm")
kwargs["min_separation"] = max(2, int(minsep * kernel_fwhm + 0.5))
else:
kwargs.pop("minsep_fwhm", None)
else:
# Translate new-style to old-style. If old-style kwargs are
# already present, they take precedence.
if "sharplo" not in kwargs and "sharphi" not in kwargs:
if "sharpness_range" in kwargs:
sr = kwargs.pop("sharpness_range")
kwargs["sharplo"] = sr[0]
kwargs["sharphi"] = sr[1]
else:
kwargs.pop("sharpness_range", None)
if "roundlo" not in kwargs and "roundhi" not in kwargs:
if "roundness_range" in kwargs:
rr = kwargs.pop("roundness_range")
kwargs["roundlo"] = rr[0]
kwargs["roundhi"] = rr[1]
else:
kwargs.pop("roundness_range", None)
if "peakmax" not in kwargs and "peak_max" in kwargs:
kwargs["peakmax"] = kwargs.pop("peak_max")
else:
kwargs.pop("peak_max", None)
if "brightest" not in kwargs and "n_brightest" in kwargs:
kwargs["brightest"] = kwargs.pop("n_brightest")
else:
kwargs.pop("n_brightest", None)
return kwargs
def _iraf_starfinder_wrapper(data, threshold_img, kernel_fwhm, mask=None, **kwargs):
"""
Make input and output of IRAFStarFinder consistent with SourceFinder and DAOStarFinder.
Parameters
----------
data : array-like
The 2D array of the image.
threshold_img : ndarray
The per-pixel absolute image value above which to select sources.
kernel_fwhm : float
The full-width at half-maximum (FWHM) of the Gaussian kernel
mask : array-like (bool), optional
The image mask
**kwargs : dict
Additional keyword arguments passed to `photutils.detection.IRAFStarFinder`.
Returns
-------
sources : `~astropy.table.QTable`
A table containing the found sources.
segmentation_image : ndarray or None
The segmentation image, or None if not applicable.
"""
# note that this suppresses TypeError: unexpected keyword arguments
# so user must be careful to know which kwargs are passed in here
finder_args = list(inspect.signature(IRAFStarFinder).parameters)
kwargs = _translate_starfinder_kwargs(
kwargs,
kernel_fwhm,
sharplo_default=0.5,
sharphi_default=2.0,
roundlo_default=0.0,
roundhi_default=0.2,
)
finder_dict = {k: kwargs.pop(k) for k in dict(kwargs) if k in finder_args}
threshold = np.median(threshold_img) # only float is supported, not per-pixel value
starfind = IRAFStarFinder(threshold, kernel_fwhm, **finder_dict)
sources = starfind(data, mask=mask)
return sources, None
def _dao_starfinder_wrapper(data, threshold_img, kernel_fwhm, mask=None, **kwargs):
"""
Make input and output of DAOStarFinder consistent with SourceFinder and IRAFStarFinder.
Parameters
----------
data : array-like
The 2D array of the image.
threshold_img : ndarray
The per-pixel absolute image value above which to select sources.
kernel_fwhm : float
The full-width at half-maximum (FWHM) of the Gaussian kernel
mask : array-like (bool), optional
The image mask
**kwargs : dict
Additional keyword arguments passed to `photutils.detection.DAOStarFinder`.
Returns
-------
sources : `~astropy.table.QTable`
A table containing the found sources.
segmentation_image : ndarray or None
The segmentation image, or None if not applicable.
"""
# For consistency with IRAFStarFinder, allow minsep_fwhm to be
# passed in and convert to pixels. DAOStarFinder never natively
# supported minsep_fwhm in any version of photutils.
# For photutils 3.0+, _translate_starfinder_kwargs handles this.
if not PHOTUTILS_GE_3 and "minsep_fwhm" in kwargs:
min_sep_pix = max(2, int(kwargs.pop("minsep_fwhm") * kernel_fwhm + 0.5))
kwargs["min_separation"] = min_sep_pix
# note that this suppresses TypeError: unexpected keyword arguments
# so user must be careful to know which kwargs are passed in here
finder_args = list(inspect.signature(DAOStarFinder).parameters)
kwargs = _translate_starfinder_kwargs(
kwargs,
kernel_fwhm,
sharplo_default=0.2,
sharphi_default=1.0,
roundlo_default=-1.0,
roundhi_default=1.0,
)
finder_dict = {k: kwargs.pop(k) for k in dict(kwargs) if k in finder_args}
threshold = np.median(threshold_img) # only float is supported, not per-pixel value
starfind = DAOStarFinder(threshold, kernel_fwhm, **finder_dict)
sources = starfind(data, mask=mask)
return sources, None
[docs]
def make_tweakreg_catalog(
model,
snr_threshold,
kernel_fwhm,
bkg_boxsize=400,
coverage_mask=None,
starfinder_name="iraf",
starfinder_kwargs=None,
):
"""
Create a catalog of point-line sources to be used for image alignment in tweakreg.
Parameters
----------
model : `~stdatamodels.jwst.datamodels.ImageModel`
The input `~stdatamodels.jwst.datamodels.ImageModel` of a single image. The input image is
assumed to be background subtracted.
snr_threshold : float
The signal-to-noise ratio per pixel above the ``background`` for
which to consider a pixel as possibly being part of a source.
kernel_fwhm : float
The full-width at half-maximum (FWHM) of the Gaussian kernel
used to convolve the image.
bkg_boxsize : float, optional
The background mesh box size in pixels.
coverage_mask : array-like (bool), optional
A boolean mask with the same shape as ``model.data``, where a `True`
value indicates the corresponding element of ``model.data`` is masked.
Masked pixels will not be included in any source.
starfinder_name : str, optional
The ``photutils`` star finder to use. Options are 'dao', 'iraf', or 'segmentation':
- 'dao': `photutils.detection.DAOStarFinder`
- 'iraf': `photutils.detection.IRAFStarFinder`
- 'segmentation': `photutils.segmentation.SourceFinder`
starfinder_kwargs : dict, optional
Additional keyword arguments to be passed to the star finder.
for 'segmentation', these can be kwargs to `photutils.segmentation.SourceFinder`
and/or `photutils.segmentation.SourceCatalog`.
for 'dao' or 'iraf', these are kwargs to `photutils.detection.DAOStarFinder`
or `photutils.detection.IRAFStarFinder`, respectively.
Defaults are as stated in the docstrings of those functions unless noted here:
- 'dao': fwhm=2.5
- 'iraf': fwhm=2.5
- 'segmentation': npixels=10, progress_bar=False
For 'dao' and 'iraf', either old-style or new-style keyword
argument names may be used regardless of the installed
photutils version. They are automatically translated to
match the installed version:
- ``sharplo``, ``sharphi`` <-> ``sharpness_range``
- ``roundlo``, ``roundhi`` <-> ``roundness_range``
- ``peakmax`` <-> ``peak_max``
- ``brightest`` <-> ``n_brightest``
- ``minsep_fwhm`` -> ``min_separation``
If both old-style and new-style keyword arguments are given
for the same parameter, the style matching the installed
photutils version takes precedence.
Note that ``minsep_fwhm`` is converted to pixels using the
formula ``min_separation = max(2, int(minsep_fwhm *
kernel_fwhm + 0.5))`` to be consistent with the way that
IRAFStarFinder handles this parameter.
Returns
-------
catalog : `~astropy.table.Table`
An astropy Table containing the source catalog.
segmentation_image : ndarray or None
The segmentation image, or None if not applicable.
"""
if not isinstance(model, ImageModel):
raise TypeError("The input model must be an ImageModel.")
if starfinder_kwargs is None:
starfinder_kwargs = {}
if starfinder_name.lower() in ["dao", "daostarfinder"]:
starfinder = _dao_starfinder_wrapper
elif starfinder_name.lower() in ["iraf", "irafstarfinder"]:
starfinder = _iraf_starfinder_wrapper
elif starfinder_name.lower() in ["segmentation", "sourcefinder"]:
starfinder = _sourcefinder_wrapper
else:
raise ValueError(f"Unknown starfinder type: {starfinder_name}")
# Mask the non-imaging area, e.g. reference pixels and MIRI non-science area
if coverage_mask is None:
coverage_mask = (
(dqflags.pixel["NON_SCIENCE"] + dqflags.pixel["DO_NOT_USE"]) & model.dq
).astype(bool)
# Compute the background and threshold image
try:
bkg = JWSTBackground(model.data, box_size=bkg_boxsize, coverage_mask=coverage_mask)
with warnings.catch_warnings():
# suppress warning about NaNs being automatically masked - this is desired
warnings.simplefilter("ignore", AstropyUserWarning)
threshold_img = snr_threshold * bkg.background_rms
data = model.data - bkg.background
except ValueError as e:
log.warning(f"Error determining sky background: {e.args[0]}")
sources = _empty_table()
_rename_columns(sources)
return sources, None
# Run the star finder
with warnings.catch_warnings(): # handle lack of detections later
warnings.filterwarnings(
"ignore", category=NoDetectionsWarning, message="No sources were found"
)
sources, segmentation_image = starfinder(
data,
threshold_img,
kernel_fwhm,
mask=coverage_mask,
**starfinder_kwargs,
)
if not sources:
log.warning("No sources found in the image.")
sources = _empty_table()
_rename_columns(sources)
return sources, segmentation_image
def _empty_table():
"""
Return an empty table with the correct column names and dtypes.
Returns
-------
`~astropy.table.Table`
An empty table with the correct column names and dtypes.
"""
default_names = ["id", "xcentroid", "ycentroid", "flux"]
default_dtypes = (int, float, float, float)
return Table(names=default_names, dtype=default_dtypes).copy()