Source code for xlens.process_pipe.systematics_multiband

# This file is part of pipe_tasks.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

__all__ = [
    "SystematicsMultibandPipeConfig",
    "SystematicsMultibandPipe",
    "SystematicsMultibandPipeConnections",
]

import logging
from typing import Any

import lsst.afw.image as afwImage
import lsst.pipe.base.connectionTypes as cT
import numpy as np
from lsst.geom import Box2I, Extent2I, Point2D, Point2I
from lsst.meas.base import SkyMapIdGeneratorConfig
from lsst.pex.config import Field, FieldValidationError
from lsst.pipe.base import (
    PipelineTask,
    PipelineTaskConfig,
    PipelineTaskConnections,
    Struct,
)
from lsst.skymap import BaseSkyMap
from lsst.utils.logging import LsstLogAdapter

from ..utils.image import resize_array, subpixel_shift


[docs] class SystematicsMultibandPipeConnections( PipelineTaskConnections, dimensions=("tract", "patch", "band", "skymap"), defaultTemplates={"inputCoaddName": "deep", "outputCoaddName": "deep"}, ):
[docs] skyMap = cT.Input( doc="SkyMap to use in processing", name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, storageClass="SkyMap", dimensions=("skymap",), )
[docs] exposure = cT.Input( doc="Input coadd image", name="{inputCoaddName}Coadd_calexp", storageClass="ExposureF", dimensions=("tract", "patch", "band", "skymap"), )
[docs] inputCatalog = cT.Input( doc=("original measurement catalog"), name="{inputCoaddName}Coadd_meas", storageClass="SourceCatalog", dimensions=("tract", "patch", "band", "skymap"), )
[docs] outputNoiseCorr = cT.Output( doc="noise correlation function", name="{outputCoaddName}_coadd_systematics_noisecorr", dimensions=("tract", "patch", "band", "skymap"), storageClass="ImageF", )
[docs] outputPsfCentered = cT.Output( doc="noise correlation function", name="{outputCoaddName}_coadd_systematics_psfcentered", dimensions=("tract", "patch", "band", "skymap"), storageClass="ImageF", )
[docs] outputStarCentered = cT.Output( doc="noise correlation function", name="{outputCoaddName}_coadd_systematics_starcentered", dimensions=("tract", "patch", "band", "skymap"), storageClass="ImageF", )
def __init__(self, *, config=None): super().__init__(config=config)
[docs] class SystematicsMultibandPipeConfig( PipelineTaskConfig, pipelineConnections=SystematicsMultibandPipeConnections, ):
[docs] psfCache = Field[int]( doc="Size of psfCache", default=100, )
[docs] npix = Field[int]( doc="number of pixels for the length of stamp", default=49, )
[docs] star_snr_min = Field[float]( doc="minimum snr threshold of stars", default=100.0, )
[docs] idGenerator = SkyMapIdGeneratorConfig.make_field()
[docs] def validate(self): super().validate() if self.npix % 2 == 0: raise FieldValidationError( self.__class__.npix, self, "npix should be odd number" )
[docs] class SystematicsMultibandPipe(PipelineTask):
[docs] _DefaultName = "FpfsTask"
[docs] ConfigClass = SystematicsMultibandPipeConfig
def __init__( self, *, config: SystematicsMultibandPipeConfig | None = None, log: logging.Logger | LsstLogAdapter | None = None, initInputs: dict[str, Any] | None = None, **kwargs: Any, ): super().__init__( config=config, log=log, initInputs=initInputs, **kwargs ) assert isinstance(self.config, SystematicsMultibandPipeConfig) return
[docs] def runQuantum(self, butlerQC, inputRefs, outputRefs): assert isinstance(self.config, SystematicsMultibandPipeConfig) # Retrieve the filename of the input exposure inputs = butlerQC.get(inputRefs) idGenerator = self.config.idGenerator.apply(butlerQC.quantum.dataId) seed = idGenerator.catalog_id inputs["seed"] = seed outputs = self.run(**inputs) butlerQC.put(outputs, outputRefs) return
[docs] def run(self, **kwargs): noise_corr = self.get_noise_corr(kwargs["exposure"]) psf_image, star_image = self.get_psf_systematics( kwargs["exposure"], kwargs["inputCatalog"], kwargs["seed"], ) return Struct( outputNoiseCorr=noise_corr, outputPsfCentered=psf_image, outputStarCentered=star_image, )
[docs] def get_noise_corr(self, exposure): assert isinstance(self.config, SystematicsMultibandPipeConfig) # noise mask = exposure.getMaskedImage().mask.array == 0 variance_plane = exposure.getMaskedImage().variance.array[mask] noise_variance = np.average(variance_plane) if noise_variance < 1e-12: raise ValueError( "the estimated image noise variance should be positive." ) window_array = (exposure.mask.array == 0).astype(np.float32) noise_array = ( np.asarray( exposure.getMaskedImage().image.array, dtype=np.float32, ) * window_array ) pad_width = ((10, 10), (10, 10)) # ((top, bottom), (left, right)) window_array = np.pad( window_array, pad_width=pad_width, mode="constant", constant_values=0.0, ) noise_array = np.pad( noise_array, pad_width=pad_width, mode="constant", constant_values=0.0, ) ny, nx = window_array.shape npixl = int(self.config.npix // 2) npixr = int(self.config.npix // 2 + 1) noise_array = np.fft.fftshift( np.fft.ifft2(np.abs(np.fft.fft2(noise_array)) ** 2.0) ).real[ ny // 2 - npixl : ny // 2 + npixr, nx // 2 - npixl : nx // 2 + npixr ] window_corr = np.fft.fftshift( np.fft.ifft2(np.abs(np.fft.fft2(window_array)) ** 2.0) ).real[ ny // 2 - npixl : ny // 2 + npixr, nx // 2 - npixl : nx // 2 + npixr ] noise_array = noise_array / window_corr v = noise_array[npixl, npixl] noise_array = noise_array / v # Create a grid of coordinates y, x = np.ogrid[: self.config.npix, : self.config.npix] # Calculate the center of the circle center = self.config.npix // 2 # Calculate the distance of each point from the center distance_from_center = np.sqrt((x - center) ** 2 + (y - center) ** 2) # Create a circular mask mask = (distance_from_center <= 20.0).astype(int) # Apply the mask to the correlation function noise_array = noise_array * mask noise_corr = afwImage.ImageF(self.config.npix, self.config.npix) noise_corr.array[:, :] = noise_array return noise_corr
[docs] def get_psf_systematics(self, exposure, inputCatalog, seed): assert isinstance(self.config, SystematicsMultibandPipeConfig) npixl = int(self.config.npix // 2) npixr = int(self.config.npix // 2 + 1) catalog = inputCatalog.asAstropy().as_array() msk = catalog["calib_psf_reserved"] & catalog["detect_isPrimary"] catalog = catalog[msk] snr = ( catalog["base_CircularApertureFlux_3_0_instFlux"] / catalog["base_CircularApertureFlux_3_0_instFluxErr"] ) bbox = exposure.getBBox() xmin_exp, ymin_exp = bbox.getMinX(), bbox.getMinY() xmax_exp, ymax_exp = bbox.getMaxX(), bbox.getMaxY() msk2 = ( (catalog["base_SdssShape_x"] > xmin_exp + npixl) & (catalog["base_SdssShape_y"] > ymin_exp + npixl) & (catalog["base_SdssShape_x"] < xmax_exp - npixr) & (catalog["base_SdssShape_y"] < ymax_exp - npixr) & (snr > self.config.star_snr_min) ) catalog = catalog[msk2] nstars = len(catalog) if nstars >= 1: np.random.seed(seed) ind = np.random.randint(0, nstars) src = catalog[ind] # Collect the PSF image exposure.getPsf().setCacheCapacity(self.config.psfCache) lsst_psf = exposure.getPsf() psf_array = lsst_psf.computeImage( Point2D( int(src["base_SdssShape_x"]), int(src["base_SdssShape_y"]), ) ).getArray() psf_array = resize_array( psf_array, (self.config.npix, self.config.npix), ) psf_image = afwImage.ImageF(self.config.npix, self.config.npix) psf_image.array[:, :] = psf_array bbox = Box2I( Point2I( int(src["base_SdssShape_x"]) - npixl, int(src["base_SdssShape_y"]) - npixl, ), Extent2I(self.config.npix, self.config.npix), ) # Collect the star image # Extract the sub-image using the BBox star_image = exposure.Factory(exposure, bbox).getImage() # Get the image component and convert to a NumPy array star_array = star_image.getArray() offset_x = src["base_SdssShape_x"] - int(src["base_SdssShape_x"]) offset_y = src["base_SdssShape_y"] - int(src["base_SdssShape_y"]) star_array = subpixel_shift(star_array, -offset_x, -offset_y) star_image.array[:, :] = star_array else: psf_image = None star_image = None return psf_image, star_image