# This file is part of pipe_tasks.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
__all__ = [
"AnacalDetectPipeConfig",
"AnacalDetectPipe",
"AnacalDetectPipeConnections",
]
import logging
from typing import Any
import lsst.pipe.base.connectionTypes as cT
import numpy as np
from lsst.meas.base import SkyMapIdGeneratorConfig
from lsst.pex.config import (
ConfigurableField,
Field,
FieldValidationError,
)
from lsst.pipe.base import (
PipelineTask,
PipelineTaskConfig,
PipelineTaskConnections,
Struct,
)
from lsst.skymap import BaseSkyMap
from lsst.utils.logging import LsstLogAdapter
from numpy.lib import recfunctions as rfn
from numpy.typing import NDArray
from ..processor.anacal import AnacalTask
from ..processor.fpfs import FpfsMeasurementTask
[docs]
class AnacalDetectPipeConnections(
PipelineTaskConnections,
dimensions=("skymap", "tract", "patch"),
defaultTemplates={
"coaddName": "deep",
},
):
[docs]
skyMap = cT.Input(
doc="SkyMap to use in processing",
name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
storageClass="SkyMap",
dimensions=("skymap",),
)
[docs]
exposure = cT.Input(
doc="Input coadd image",
name="{coaddName}_coadd",
storageClass="ExposureF",
dimensions=("skymap", "tract", "patch", "band"),
multiple=True,
deferLoad=True,
)
[docs]
noise_corr = cT.Input(
doc="noise correlation function",
name="deep_coadd_systematics_noisecorr",
storageClass="ImageF",
dimensions=("skymap", "tract", "patch", "band"),
minimum=0,
multiple=True,
deferLoad=True,
)
[docs]
output_catalog = cT.Output(
doc="Source catalog with joint detection and measurement",
name="{coaddName}_coadd_anacal_detect",
dimensions=("skymap", "tract", "patch"),
storageClass="ArrowAstropy",
)
def __init__(self, *, config=None):
super().__init__(config=config)
[docs]
class AnacalDetectPipeConfig(
PipelineTaskConfig,
pipelineConnections=AnacalDetectPipeConnections,
):
[docs]
anacal = ConfigurableField(
target=AnacalTask,
doc="AnaCal Task Detect",
)
[docs]
fpfs = ConfigurableField(
target=FpfsMeasurementTask,
doc="Fpfs Source Measurement Task",
)
[docs]
do_fpfs = Field[bool](
doc="Whether to drun fpfs task",
default=False,
)
[docs]
psfCache = Field[int](
doc="Size of PSF cache",
default=100,
)
[docs]
idGenerator = SkyMapIdGeneratorConfig.make_field()
[docs]
def validate(self):
super().validate()
if self.do_fpfs:
if self.fpfs.sigma_shapelets1 < 0.0:
raise FieldValidationError(
self.fpfs.fields["sigma_shapelets1"],
self,
"sigma_shapelets1 in a wrong range",
)
[docs]
def setDefaults(self):
super().setDefaults()
self.fpfs.do_compute_detect_weight = False
[docs]
class AnacalDetectPipe(PipelineTask):
[docs]
_DefaultName = "AnacalDetectPipe"
[docs]
ConfigClass = AnacalDetectPipeConfig
def __init__(
self,
*,
config: AnacalDetectPipeConfig | None = None,
log: logging.Logger | LsstLogAdapter | None = None,
initInputs: dict[str, Any] | None = None,
**kwargs: Any,
):
super().__init__(
config=config, log=log, initInputs=initInputs, **kwargs
)
assert isinstance(self.config, AnacalDetectPipeConfig)
self.makeSubtask("anacal")
if self.config.do_fpfs:
self.makeSubtask("fpfs")
return
[docs]
def runQuantum(self, butlerQC, inputRefs, outputRefs):
assert isinstance(self.config, AnacalDetectPipeConfig)
inputs = butlerQC.get(inputRefs)
tract = int(butlerQC.quantum.dataId["tract"])
patch = int(butlerQC.quantum.dataId["patch"])
exposure_handles = inputs["exposure"]
exposure_handles_dict = {
handle.dataId["band"]: handle for handle in exposure_handles
}
correlation_handles = inputs["noise_corr"]
if len(correlation_handles) == 0:
correlation_handles_dict = None
else:
correlation_handles_dict = {
handle.dataId["band"]: handle for handle in correlation_handles
}
skyMap = inputs["skyMap"]
outputs = self.run(
exposure_handles_dict=exposure_handles_dict,
correlation_handles_dict=correlation_handles_dict,
skyMap=skyMap,
tract=tract,
patch=patch,
)
butlerQC.put(outputs, outputRefs)
return
[docs]
def run_measure(self, data):
assert isinstance(self.config, AnacalDetectPipeConfig)
out = []
catalog = self.anacal.run(**data)
out.append(catalog)
if self.config.do_fpfs:
data["detection"] = catalog
out.append(
self.fpfs.run(**data)
)
return rfn.merge_arrays(out, flatten=True)
[docs]
def run(
self,
*,
exposure_handles_dict: dict,
correlation_handles_dict: dict | None,
skyMap,
tract: int,
patch: int,
seed_offset: int = 0,
mask_array: NDArray | None = None,
**kwargs,
):
assert isinstance(self.config, AnacalDetectPipeConfig)
band = "i"
handle = exposure_handles_dict[band]
exposure = handle.get()
exposure.getPsf().setCacheCapacity(self.config.psfCache)
if correlation_handles_dict is not None:
noise_corr = correlation_handles_dict[band].get().getArray()
variance = np.amax(noise_corr)
noise_corr = noise_corr / variance
ny, nx = noise_corr.shape
assert noise_corr[ny // 2, nx // 2] == 1
self.log.debug("With correlation, variance:", variance)
else:
noise_corr = None
idGenerator = self.config.idGenerator.apply(handle.dataId)
seed = idGenerator.catalog_id + seed_offset
data = self.anacal.prepare_data(
exposure=exposure,
band=band,
seed=seed,
noise_corr=noise_corr,
detection=None,
skyMap=skyMap,
tract=tract,
patch=patch,
mask_array=mask_array,
)
catalog = self.run_measure(data)
return Struct(output_catalog=catalog)