Source code for xlens.process_pipe.anacal_detect

# This file is part of pipe_tasks.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

__all__ = [
    "AnacalDetectPipeConfig",
    "AnacalDetectPipe",
    "AnacalDetectPipeConnections",
]

import logging
from typing import Any

import lsst.pipe.base.connectionTypes as cT
import numpy as np
from lsst.meas.base import SkyMapIdGeneratorConfig
from lsst.pex.config import (
    ConfigurableField,
    Field,
    FieldValidationError,
)
from lsst.pipe.base import (
    PipelineTask,
    PipelineTaskConfig,
    PipelineTaskConnections,
    Struct,
)
from lsst.skymap import BaseSkyMap
from lsst.utils.logging import LsstLogAdapter
from numpy.lib import recfunctions as rfn
from numpy.typing import NDArray

from ..processor.anacal import AnacalTask
from ..processor.fpfs import FpfsMeasurementTask


[docs] class AnacalDetectPipeConnections( PipelineTaskConnections, dimensions=("skymap", "tract", "patch"), defaultTemplates={ "coaddName": "deep", }, ):
[docs] skyMap = cT.Input( doc="SkyMap to use in processing", name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, storageClass="SkyMap", dimensions=("skymap",), )
[docs] exposure = cT.Input( doc="Input coadd image", name="{coaddName}_coadd", storageClass="ExposureF", dimensions=("skymap", "tract", "patch", "band"), multiple=True, deferLoad=True, )
[docs] noise_corr = cT.Input( doc="noise correlation function", name="deep_coadd_systematics_noisecorr", storageClass="ImageF", dimensions=("skymap", "tract", "patch", "band"), minimum=0, multiple=True, deferLoad=True, )
[docs] output_catalog = cT.Output( doc="Source catalog with joint detection and measurement", name="{coaddName}_coadd_anacal_detect", dimensions=("skymap", "tract", "patch"), storageClass="ArrowAstropy", )
def __init__(self, *, config=None): super().__init__(config=config)
[docs] class AnacalDetectPipeConfig( PipelineTaskConfig, pipelineConnections=AnacalDetectPipeConnections, ):
[docs] anacal = ConfigurableField( target=AnacalTask, doc="AnaCal Task Detect", )
[docs] fpfs = ConfigurableField( target=FpfsMeasurementTask, doc="Fpfs Source Measurement Task", )
[docs] do_fpfs = Field[bool]( doc="Whether to drun fpfs task", default=False, )
[docs] psfCache = Field[int]( doc="Size of PSF cache", default=100, )
[docs] idGenerator = SkyMapIdGeneratorConfig.make_field()
[docs] def validate(self): super().validate() if self.do_fpfs: if self.fpfs.sigma_shapelets1 < 0.0: raise FieldValidationError( self.fpfs.fields["sigma_shapelets1"], self, "sigma_shapelets1 in a wrong range", )
[docs] def setDefaults(self): super().setDefaults() self.fpfs.do_compute_detect_weight = False
[docs] class AnacalDetectPipe(PipelineTask):
[docs] _DefaultName = "AnacalDetectPipe"
[docs] ConfigClass = AnacalDetectPipeConfig
def __init__( self, *, config: AnacalDetectPipeConfig | None = None, log: logging.Logger | LsstLogAdapter | None = None, initInputs: dict[str, Any] | None = None, **kwargs: Any, ): super().__init__( config=config, log=log, initInputs=initInputs, **kwargs ) assert isinstance(self.config, AnacalDetectPipeConfig) self.makeSubtask("anacal") if self.config.do_fpfs: self.makeSubtask("fpfs") return
[docs] def runQuantum(self, butlerQC, inputRefs, outputRefs): assert isinstance(self.config, AnacalDetectPipeConfig) inputs = butlerQC.get(inputRefs) tract = int(butlerQC.quantum.dataId["tract"]) patch = int(butlerQC.quantum.dataId["patch"]) exposure_handles = inputs["exposure"] exposure_handles_dict = { handle.dataId["band"]: handle for handle in exposure_handles } correlation_handles = inputs["noise_corr"] if len(correlation_handles) == 0: correlation_handles_dict = None else: correlation_handles_dict = { handle.dataId["band"]: handle for handle in correlation_handles } skyMap = inputs["skyMap"] outputs = self.run( exposure_handles_dict=exposure_handles_dict, correlation_handles_dict=correlation_handles_dict, skyMap=skyMap, tract=tract, patch=patch, ) butlerQC.put(outputs, outputRefs) return
[docs] def run_measure(self, data): assert isinstance(self.config, AnacalDetectPipeConfig) out = [] catalog = self.anacal.run(**data) out.append(catalog) if self.config.do_fpfs: data["detection"] = catalog out.append( self.fpfs.run(**data) ) return rfn.merge_arrays(out, flatten=True)
[docs] def run( self, *, exposure_handles_dict: dict, correlation_handles_dict: dict | None, skyMap, tract: int, patch: int, seed_offset: int = 0, mask_array: NDArray | None = None, **kwargs, ): assert isinstance(self.config, AnacalDetectPipeConfig) band = "i" handle = exposure_handles_dict[band] exposure = handle.get() exposure.getPsf().setCacheCapacity(self.config.psfCache) if correlation_handles_dict is not None: noise_corr = correlation_handles_dict[band].get().getArray() variance = np.amax(noise_corr) noise_corr = noise_corr / variance ny, nx = noise_corr.shape assert noise_corr[ny // 2, nx // 2] == 1 self.log.debug("With correlation, variance:", variance) else: noise_corr = None idGenerator = self.config.idGenerator.apply(handle.dataId) seed = idGenerator.catalog_id + seed_offset data = self.anacal.prepare_data( exposure=exposure, band=band, seed=seed, noise_corr=noise_corr, detection=None, skyMap=skyMap, tract=tract, patch=patch, mask_array=mask_array, ) catalog = self.run_measure(data) return Struct(output_catalog=catalog)