Source code for xlens.process_pipe.fpfs_force

#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

__all__ = [
    "FpfsForcePipeConfig",
    "FpfsForcePipe",
    "FpfsForcePipeConnections",
]

import logging
from typing import Any

import lsst.pipe.base.connectionTypes as cT
import numpy as np
from lsst.meas.base import SkyMapIdGeneratorConfig
from lsst.pex.config import ConfigurableField, Field, FieldValidationError
from lsst.pipe.base import (
    PipelineTask,
    PipelineTaskConfig,
    PipelineTaskConnections,
    Struct,
)
from lsst.utils.logging import LsstLogAdapter
from numpy.lib import recfunctions as rfn

from ..processor.fpfs import FpfsMeasurementTask


[docs] class FpfsForcePipeConnections( PipelineTaskConnections, dimensions=("skymap", "tract", "patch"), defaultTemplates={ "coaddName": "deep", }, ):
[docs] input_catalog = cT.Input( doc="Source catalog with joint detection and measurement", name="{coaddName}_coadd_anacal_detect", dimensions=("skymap", "tract", "patch"), storageClass="ArrowAstropy", multiple=False, )
[docs] exposure = cT.Input( doc="Input coadd image", name="{coaddName}_coadd", storageClass="ExposureF", dimensions=("skymap", "tract", "patch", "band"), multiple=True, deferLoad=True, )
[docs] noise_corr = cT.Input( doc="noise correlation function", name="deep_coadd_systematics_noisecorr", storageClass="ImageF", dimensions=("skymap", "tract", "patch", "band"), minimum=0, multiple=True, deferLoad=True, )
[docs] catalog = cT.Output( doc="Source catalog with all the measurement generated in this task", name="{coaddName}_coadd_fpfs_force", dimensions=("skymap", "tract", "patch"), storageClass="ArrowAstropy", )
def __init__(self, *, config=None): super().__init__(config=config)
[docs] class FpfsForcePipeConfig( PipelineTaskConfig, pipelineConnections=FpfsForcePipeConnections, ):
[docs] fpfs = ConfigurableField( target=FpfsMeasurementTask, doc="Fpfs Source Measurement Task", )
[docs] psfCache = Field[int]( doc="Size of PSF cache", default=100, )
[docs] idGenerator = SkyMapIdGeneratorConfig.make_field()
[docs] def validate(self): super().validate() if self.fpfs.sigma_shapelets1 < 0.0: raise FieldValidationError( self.fpfs.__class__.sigma_shapelets1, self, "sigma_shapelets1 in a wrong range", )
[docs] def setDefaults(self): super().setDefaults() self.fpfs.do_compute_detect_weight = False
[docs] class FpfsForcePipe(PipelineTask):
[docs] _DefaultName = "FpfsForcePipe"
[docs] ConfigClass = FpfsForcePipeConfig
def __init__( self, *, config: FpfsForcePipeConfig | None = None, log: logging.Logger | LsstLogAdapter | None = None, initInputs: dict[str, Any] | None = None, **kwargs: Any, ): super().__init__( config=config, log=log, initInputs=initInputs, **kwargs ) assert isinstance(self.config, FpfsForcePipeConfig) self.makeSubtask("fpfs") return
[docs] def runQuantum(self, butlerQC, inputRefs, outputRefs): assert isinstance(self.config, FpfsForcePipeConfig) inputs = butlerQC.get(inputRefs) exposure_handles = inputs["exposure"] exposure_handles_dict = { handle.dataId["band"]: handle for handle in exposure_handles } correlation_handles = inputs["noise_corr"] if len(correlation_handles) == 0: correlation_handles_dict = None else: correlation_handles_dict = { handle.dataId["band"]: handle for handle in correlation_handles } outputs = self.run( detection=inputs["input_catalog"].as_array(), exposure_handles_dict=exposure_handles_dict, correlation_handles_dict=correlation_handles_dict, ) butlerQC.put(outputs, outputRefs) return
[docs] def run( self, *, detection, exposure_handles_dict: dict, correlation_handles_dict: dict | None, **kwargs, ): assert isinstance(self.config, FpfsForcePipeConfig) if detection is not None: anacal_colnames = [ "x1", "x2", "flux", "dflux_dg1", "dflux_dg2", "wsel", "dwsel_dg1", "dwsel_dg2", ] cat = rfn.repack_fields( detection[anacal_colnames] ) catalog = [cat] else: catalog = [] for band in exposure_handles_dict.keys(): handle = exposure_handles_dict[band] exposure = handle.get() exposure.getPsf().setCacheCapacity(self.config.psfCache) if correlation_handles_dict is not None: noise_corr = correlation_handles_dict[band].get().getArray() variance = np.amax(noise_corr) noise_corr = noise_corr / variance ny, nx = noise_corr.shape assert noise_corr[ny // 2, nx // 2] == 1 self.log.debug("With correlation, variance:", variance) else: noise_corr = None idGenerator = self.config.idGenerator.apply(handle.dataId) seed = idGenerator.catalog_id data = self.fpfs.prepare_data( band=band, exposure=exposure, seed=seed, noise_corr=noise_corr, detection=detection, ) res = self.fpfs.run(**data) colnames = res.dtype.names map_dict = {name: f"{band}_" + name for name in colnames} res = rfn.rename_fields(res, map_dict) catalog.append(res) del exposure, data catalog = rfn.merge_arrays(catalog, flatten=True) return Struct(catalog=catalog)