Source code for xlens.processor.fpfs

"""FPFS shape measurement task wrapping :mod:`anacal.fpfs`.

Provides :class:`FpfsMeasurementTask`, a Rubin-style ``Task`` that measures
FPFS shapelet moments from coadd exposures and returns structured catalogs
ready for shear estimation.
"""

from typing import Any

import anacal
import numpy as np
from lsst.afw.image import ExposureF
from lsst.pex.config import Config, Field, FieldValidationError, ListField
from lsst.pipe.base import Task
from numpy.typing import NDArray

from .. import utils
from ..utils.random import num_rot


[docs] class FpfsMeasurementConfig(Config): """Configuration for :class:`FpfsMeasurementTask`."""
[docs] npix = Field[int]( doc="number of pixels in stamp [pixel]", default=64, )
[docs] bound = Field[int]( doc="Sources to be removed if too close to boundary [pixel]", default=32, )
[docs] sigma_shapelets = Field[float]( doc="Shapelet's Gaussian kernel size for detection [arcsec]", default=0.52, )
[docs] sigma_shapelets1 = Field[float]( doc="Shapelet's Gaussian kernel size for measurement [arcsec]", optional=True, default=-1, )
[docs] sigma_shapelets2 = Field[float]( doc="Shapelet's Gaussian kernel for the second measurement [arcsec]", optional=True, default=-1, )
[docs] snr_min = Field[float]( doc="Shapelet's Gaussian kernel for the second measurement", optional=True, default=12.0, )
[docs] r2_min = Field[float]( doc="Shapelet's Gaussian kernel for the second measurement", optional=True, default=0.1, )
[docs] pthres = Field[float]( doc="peak detection threshold", default=0.12, )
[docs] kmax_thres = Field[float]( doc="threshold to determine the maximum k in Fourier space", default=1e-12, )
[docs] do_noise_bias_correction = Field[bool]( doc="whether to doulbe the noise for noise bias correction", default=True, )
[docs] do_compute_detect_weight = Field[bool]( doc="whether to compute detection mode", default=True, )
[docs] return_only_linear_modes = Field[bool]( doc="whether only return linear modes", default=False, )
[docs] psf_model_type = Field[str]( doc="type of psf model (choose from object, block, patch)", default="patch", )
[docs] badMaskPlanes = ListField[str]( doc="Mask planes used to reject bad pixels.", default=[], )
[docs] noiseId = Field[int]( doc="Noise realization id", default=0, )
[docs] rotId = Field[int]( doc="rotation id", default=0, )
[docs] def validate(self): super().validate() if self.sigma_shapelets > 2.0 or self.sigma_shapelets < 0.0: raise FieldValidationError( self.__class__.sigma_shapelets, self, "sigma_shapelets in a wrong range", ) if self.sigma_shapelets1 > 2.0: raise FieldValidationError( self.__class__.sigma_shapelets1, self, "sigma_shapelets1 in a wrong range", ) if self.sigma_shapelets2 > 2.0: raise FieldValidationError( self.__class__.sigma_shapelets2, self, "sigma_shapelets2 in a wrong range", ) if self.noiseId < 0: raise FieldValidationError( self.__class__.noiseId, self, "We require noiseId >=0", ) if self.rotId >= num_rot: raise FieldValidationError( self.__class__.rotId, self, "rotId needs to be smaller than 2", )
[docs] def setDefaults(self): super().setDefaults()
[docs] class FpfsMeasurementTask(Task): """Measure FPFS shapelet observables from coadd image data. Wraps :func:`anacal.fpfs.process_image` behind the Rubin ``Task`` interface. Call :meth:`prepare_data` to extract arrays from an LSST ``ExposureF``, then :meth:`run` to perform the measurement. """
[docs] _DefaultName = "FpfsMeasurementTask"
[docs] ConfigClass = FpfsMeasurementConfig
def __init__(self, **kwargs: Any): super().__init__(**kwargs) assert isinstance(self.config, FpfsMeasurementConfig)
[docs] self.fpfs_config = anacal.fpfs.FpfsConfig( npix=self.config.npix, kmax_thres=self.config.kmax_thres, sigma_shapelets=self.config.sigma_shapelets, sigma_shapelets1=self.config.sigma_shapelets1, sigma_shapelets2=self.config.sigma_shapelets2, pthres=self.config.pthres, bound=self.config.bound, snr_min=self.config.snr_min, r2_min=self.config.r2_min, )
return
[docs] def run( self, *, pixel_scale: float, mag_zero: float, noise_variance: float, gal_array: NDArray, psf_array: NDArray, mask_array: NDArray, noise_array: NDArray | None, detection: NDArray | None, psf_object: utils.image.LsstPsf | None, base_column_name: str | None = None, begin_x: int = 0, begin_y: int = 0, **kwargs, ): """Run FPFS measurement on image arrays. Parameters ---------- pixel_scale : float Pixel scale in arcsec/pixel. mag_zero : float Magnitude zeropoint. noise_variance : float Per-pixel noise variance. gal_array : NDArray Galaxy image array. psf_array : NDArray PSF image array. mask_array : NDArray Bad-pixel mask array. noise_array : NDArray or None Noise realisation for noise-bias correction. detection : NDArray or None External detection catalog with ``x1_det``, ``x2_det`` columns (in arcsec). If *None*, peaks are detected internally. psf_object : LsstPsf or None Position-dependent PSF model. base_column_name : str or None Prefix prepended to all output column names. begin_x, begin_y : int Pixel origin offset for sub-images. Returns ------- np.ndarray Structured array of FPFS shape measurements. """ assert isinstance(self.config, FpfsMeasurementConfig) if detection is not None: fpfs_peaks_dtype = np.dtype([ ('y', np.float64), ('x', np.float64), ]) det = np.zeros(len(detection), dtype=fpfs_peaks_dtype) det["x"] = detection["x1_det"] / pixel_scale - begin_x det["y"] = detection["x2_det"] / pixel_scale - begin_y else: det = None catalog = anacal.fpfs.process_image( fpfs_config=self.fpfs_config, pixel_scale=pixel_scale, mag_zero=mag_zero, noise_variance=noise_variance, gal_array=gal_array, psf_array=psf_array, mask_array=mask_array, noise_array=noise_array, detection=det, psf_object=psf_object, do_compute_detect_weight=self.config.do_compute_detect_weight, base_column_name=base_column_name, return_only_linear_modes=self.config.return_only_linear_modes, pack_linear_modes=True, ) return catalog
[docs] def prepare_data( self, *, exposure: ExposureF, seed: int, band: str | None, noise_corr: NDArray | None = None, mask_array: NDArray | None = None, noise_array: NDArray | None = None, star_cat: NDArray | None = None, detection: NDArray | None = None, **kwargs, ): """Prepares the data from LSST exposure Args: exposure (ExposureF): LSST exposure seed (int): random seed noise_corr (NDArray): image noise correlation function (None) detection (NDArray | None): external detection catalog (None) Returns: (dict) """ assert isinstance(self.config, FpfsMeasurementConfig) lsst_bbox = exposure.getBBox() data = utils.image.prepare_data( band=band, exposure=exposure, seed=seed, noiseId=self.config.noiseId, rotId=self.config.rotId, npix=self.config.npix, noise_corr=noise_corr, do_noise_bias_correction=self.config.do_noise_bias_correction, badMaskPlanes=self.config.badMaskPlanes, star_cat=star_cat, mask_array=mask_array, noise_array=noise_array, detection=detection, ) if self.config.psf_model_type == "object": data["psf_object"] = utils.image.LsstPsf( psf=exposure.getPsf(), npix=self.config.npix, lsst_bbox=lsst_bbox, ) else: data["psf_object"] = None return data