"""FPFS shape measurement task wrapping :mod:`anacal.fpfs`.
Provides :class:`FpfsMeasurementTask`, a Rubin-style ``Task`` that measures
FPFS shapelet moments from coadd exposures and returns structured catalogs
ready for shear estimation.
"""
from typing import Any
import anacal
import numpy as np
from lsst.afw.image import ExposureF
from lsst.pex.config import Config, Field, FieldValidationError, ListField
from lsst.pipe.base import Task
from numpy.typing import NDArray
from .. import utils
from ..utils.random import num_rot
[docs]
class FpfsMeasurementConfig(Config):
"""Configuration for :class:`FpfsMeasurementTask`."""
[docs]
npix = Field[int](
doc="number of pixels in stamp [pixel]",
default=64,
)
[docs]
bound = Field[int](
doc="Sources to be removed if too close to boundary [pixel]",
default=32,
)
[docs]
sigma_shapelets = Field[float](
doc="Shapelet's Gaussian kernel size for detection [arcsec]",
default=0.52,
)
[docs]
sigma_shapelets1 = Field[float](
doc="Shapelet's Gaussian kernel size for measurement [arcsec]",
optional=True,
default=-1,
)
[docs]
sigma_shapelets2 = Field[float](
doc="Shapelet's Gaussian kernel for the second measurement [arcsec]",
optional=True,
default=-1,
)
[docs]
snr_min = Field[float](
doc="Shapelet's Gaussian kernel for the second measurement",
optional=True,
default=12.0,
)
[docs]
r2_min = Field[float](
doc="Shapelet's Gaussian kernel for the second measurement",
optional=True,
default=0.1,
)
[docs]
pthres = Field[float](
doc="peak detection threshold",
default=0.12,
)
[docs]
kmax_thres = Field[float](
doc="threshold to determine the maximum k in Fourier space",
default=1e-12,
)
[docs]
do_noise_bias_correction = Field[bool](
doc="whether to doulbe the noise for noise bias correction",
default=True,
)
[docs]
do_compute_detect_weight = Field[bool](
doc="whether to compute detection mode",
default=True,
)
[docs]
return_only_linear_modes = Field[bool](
doc="whether only return linear modes",
default=False,
)
[docs]
psf_model_type = Field[str](
doc="type of psf model (choose from object, block, patch)",
default="patch",
)
[docs]
badMaskPlanes = ListField[str](
doc="Mask planes used to reject bad pixels.",
default=[],
)
[docs]
noiseId = Field[int](
doc="Noise realization id",
default=0,
)
[docs]
rotId = Field[int](
doc="rotation id",
default=0,
)
[docs]
def validate(self):
super().validate()
if self.sigma_shapelets > 2.0 or self.sigma_shapelets < 0.0:
raise FieldValidationError(
self.__class__.sigma_shapelets,
self,
"sigma_shapelets in a wrong range",
)
if self.sigma_shapelets1 > 2.0:
raise FieldValidationError(
self.__class__.sigma_shapelets1,
self,
"sigma_shapelets1 in a wrong range",
)
if self.sigma_shapelets2 > 2.0:
raise FieldValidationError(
self.__class__.sigma_shapelets2,
self,
"sigma_shapelets2 in a wrong range",
)
if self.noiseId < 0:
raise FieldValidationError(
self.__class__.noiseId,
self,
"We require noiseId >=0",
)
if self.rotId >= num_rot:
raise FieldValidationError(
self.__class__.rotId,
self,
"rotId needs to be smaller than 2",
)
[docs]
def setDefaults(self):
super().setDefaults()
[docs]
class FpfsMeasurementTask(Task):
"""Measure FPFS shapelet observables from coadd image data.
Wraps :func:`anacal.fpfs.process_image` behind the Rubin ``Task``
interface. Call :meth:`prepare_data` to extract arrays from an
LSST ``ExposureF``, then :meth:`run` to perform the measurement.
"""
[docs]
_DefaultName = "FpfsMeasurementTask"
[docs]
ConfigClass = FpfsMeasurementConfig
def __init__(self, **kwargs: Any):
super().__init__(**kwargs)
assert isinstance(self.config, FpfsMeasurementConfig)
[docs]
self.fpfs_config = anacal.fpfs.FpfsConfig(
npix=self.config.npix,
kmax_thres=self.config.kmax_thres,
sigma_shapelets=self.config.sigma_shapelets,
sigma_shapelets1=self.config.sigma_shapelets1,
sigma_shapelets2=self.config.sigma_shapelets2,
pthres=self.config.pthres,
bound=self.config.bound,
snr_min=self.config.snr_min,
r2_min=self.config.r2_min,
)
return
[docs]
def run(
self,
*,
pixel_scale: float,
mag_zero: float,
noise_variance: float,
gal_array: NDArray,
psf_array: NDArray,
mask_array: NDArray,
noise_array: NDArray | None,
detection: NDArray | None,
psf_object: utils.image.LsstPsf | None,
base_column_name: str | None = None,
begin_x: int = 0,
begin_y: int = 0,
**kwargs,
):
"""Run FPFS measurement on image arrays.
Parameters
----------
pixel_scale : float
Pixel scale in arcsec/pixel.
mag_zero : float
Magnitude zeropoint.
noise_variance : float
Per-pixel noise variance.
gal_array : NDArray
Galaxy image array.
psf_array : NDArray
PSF image array.
mask_array : NDArray
Bad-pixel mask array.
noise_array : NDArray or None
Noise realisation for noise-bias correction.
detection : NDArray or None
External detection catalog with ``x1_det``, ``x2_det`` columns
(in arcsec). If *None*, peaks are detected internally.
psf_object : LsstPsf or None
Position-dependent PSF model.
base_column_name : str or None
Prefix prepended to all output column names.
begin_x, begin_y : int
Pixel origin offset for sub-images.
Returns
-------
np.ndarray
Structured array of FPFS shape measurements.
"""
assert isinstance(self.config, FpfsMeasurementConfig)
if detection is not None:
fpfs_peaks_dtype = np.dtype([
('y', np.float64),
('x', np.float64),
])
det = np.zeros(len(detection), dtype=fpfs_peaks_dtype)
det["x"] = detection["x1_det"] / pixel_scale - begin_x
det["y"] = detection["x2_det"] / pixel_scale - begin_y
else:
det = None
catalog = anacal.fpfs.process_image(
fpfs_config=self.fpfs_config,
pixel_scale=pixel_scale,
mag_zero=mag_zero,
noise_variance=noise_variance,
gal_array=gal_array,
psf_array=psf_array,
mask_array=mask_array,
noise_array=noise_array,
detection=det,
psf_object=psf_object,
do_compute_detect_weight=self.config.do_compute_detect_weight,
base_column_name=base_column_name,
return_only_linear_modes=self.config.return_only_linear_modes,
pack_linear_modes=True,
)
return catalog
[docs]
def prepare_data(
self,
*,
exposure: ExposureF,
seed: int,
band: str | None,
noise_corr: NDArray | None = None,
mask_array: NDArray | None = None,
noise_array: NDArray | None = None,
star_cat: NDArray | None = None,
detection: NDArray | None = None,
**kwargs,
):
"""Prepares the data from LSST exposure
Args:
exposure (ExposureF): LSST exposure
seed (int): random seed
noise_corr (NDArray): image noise correlation function (None)
detection (NDArray | None): external detection catalog (None)
Returns:
(dict)
"""
assert isinstance(self.config, FpfsMeasurementConfig)
lsst_bbox = exposure.getBBox()
data = utils.image.prepare_data(
band=band,
exposure=exposure,
seed=seed,
noiseId=self.config.noiseId,
rotId=self.config.rotId,
npix=self.config.npix,
noise_corr=noise_corr,
do_noise_bias_correction=self.config.do_noise_bias_correction,
badMaskPlanes=self.config.badMaskPlanes,
star_cat=star_cat,
mask_array=mask_array,
noise_array=noise_array,
detection=detection,
)
if self.config.psf_model_type == "object":
data["psf_object"] = utils.image.LsstPsf(
psf=exposure.getPsf(), npix=self.config.npix,
lsst_bbox=lsst_bbox,
)
else:
data["psf_object"] = None
return data