Source code for xlens.process_pipe.fpfs_joint

# This file is part of pipe_tasks.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

__all__ = [
    "FpfsJointPipeConfig",
    "FpfsJointPipe",
    "FpfsJointPipeConnections",
]

import logging
from typing import Any

import lsst.pipe.base.connectionTypes as cT
from lsst.meas.base import SkyMapIdGeneratorConfig
from lsst.pex.config import ConfigurableField, Field
from lsst.pipe.base import (
    PipelineTask,
    PipelineTaskConfig,
    PipelineTaskConnections,
    Struct,
)
from lsst.utils.logging import LsstLogAdapter

from ..processor.fpfs import FpfsMeasurementTask


[docs] class FpfsJointPipeConnections( PipelineTaskConnections, dimensions=("skymap", "tract", "patch"), defaultTemplates={ "coaddName": "deep", }, ):
[docs] exposure = cT.Input( doc="Input coadd image", name="{coaddName}_coadd", storageClass="ExposureF", dimensions=("skymap", "tract", "patch", "band"), multiple=True, deferLoad=True, )
[docs] noise_corr = cT.Input( doc="noise correlation function", name="{coaddName}_coadd_systematics_noisecorr", storageClass="ImageF", dimensions=("skymap", "tract", "patch", "band"), minimum=0, multiple=True, deferLoad=True, )
[docs] detect_catalog = cT.Output( doc="Source catalog with joint detection and measurement", name="{coaddName}_coadd_anacal_detect", dimensions=("skymap", "tract", "patch"), storageClass="ArrowAstropy", )
def __init__(self, *, config=None): super().__init__(config=config)
[docs] class FpfsJointPipeConfig( PipelineTaskConfig, pipelineConnections=FpfsJointPipeConnections, ):
[docs] fpfs = ConfigurableField( target=FpfsMeasurementTask, doc="Fpfs Source Measurement Task", )
[docs] psfCache = Field[int]( doc="Size of PSF cache", default=100, )
[docs] idGenerator = SkyMapIdGeneratorConfig.make_field()
[docs] use_truth_detection = Field[bool]( doc="whether to use truth catalog as detection", default=False, )
[docs] use_dm_detection = Field[bool]( doc="whether to use dm catalog as detection", default=False, )
[docs] def validate(self): super().validate()
[docs] def setDefaults(self): super().setDefaults() self.fpfs.sigma_shapelets1 = -1 self.fpfs.sigma_shapelets2 = -1 self.fpfs.do_compute_detect_weight = True
[docs] class FpfsJointPipe(PipelineTask):
[docs] _DefaultName = "FpfsJointPipe"
[docs] ConfigClass = FpfsJointPipeConfig
def __init__( self, *, config: FpfsJointPipeConfig | None = None, log: logging.Logger | LsstLogAdapter | None = None, initInputs: dict[str, Any] | None = None, **kwargs: Any, ): super().__init__( config=config, log=log, initInputs=initInputs, **kwargs ) assert isinstance(self.config, FpfsJointPipeConfig) self.makeSubtask("fpfs") return
[docs] def runQuantum(self, butlerQC, inputRefs, outputRefs): assert isinstance(self.config, FpfsJointPipeConfig) inputs = butlerQC.get(inputRefs) exposure_handles = inputs["exposure"] exposure_handles_dict = { handle.dataId["band"]: handle for handle in exposure_handles } correlation_handles = inputs["noise_corr"] if len(correlation_handles) == 0: correlation_handles_dict = None else: correlation_handles_dict = { handle.dataId["band"]: handle for handle in correlation_handles } outputs = self.run( exposure_handles_dict=exposure_handles_dict, correlation_handles_dict=correlation_handles_dict, ) butlerQC.put(outputs, outputRefs) return
[docs] def run( self, *, exposure_handles_dict: dict, correlation_handles_dict: dict | None, ): assert isinstance(self.config, FpfsJointPipeConfig) band = "i" handle = exposure_handles_dict[band] exposure = handle.get() exposure.getPsf().setCacheCapacity(self.config.psfCache) if correlation_handles_dict is not None: handle = correlation_handles_dict[band] noise_corr = handle.get() else: noise_corr = None idGenerator = self.config.idGenerator.apply(handle.dataId) seed = idGenerator.catalog_id data = self.fpfs.prepare_data( band=band, exposure=exposure, seed=seed, noise_corr=noise_corr, detection=None, ) catalog = self.fpfs.run(**data) return Struct(detect_catalog=catalog)