Source code for xlens.analysis.neff

# This file is part of pipe_tasks.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

__all__ = [
    "NeffMultibandPipeConnections",
    "NeffMultibandPipeConfig",
    "NeffMultibandPipe",
    "NeffSummaryMultibandPipeConnections",
    "NeffSummaryMultibandPipeConfig",
    "NeffSummaryMultibandPipe",
]

import logging
from typing import Any

import lsst.pipe.base.connectionTypes as cT
import numpy as np
from lsst.pex.config import Field, FieldValidationError, ListField
from lsst.pipe.base import (
    PipelineTask,
    PipelineTaskConfig,
    PipelineTaskConnections,
    Struct,
)
from lsst.skymap import BaseSkyMap
from lsst.utils.logging import LsstLogAdapter


[docs] class NeffMultibandPipeConnections( PipelineTaskConnections, dimensions=("skymap", "tract", "patch"), defaultTemplates={ "coaddName": "deep", "dataType": "", "version": "", }, ):
[docs] skyMap = cT.Input( doc="SkyMap to use in processing", name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, storageClass="SkyMap", dimensions=("skymap",), )
[docs] src = cT.Input( doc="Source catalog with all the measurement generated in this task", name="{coaddName}_0_rot0_coadd_{dataType}", dimensions=("skymap", "tract", "patch"), storageClass="ArrowAstropy", )
[docs] summary = cT.Output( doc="Summary statistics", name="{coaddName}_coadd_anacal_neff_flux_{dataType}{version}", storageClass="ArrowAstropy", dimensions=("skymap", "tract", "patch"), )
def __init__(self, *, config=None): super().__init__(config=config)
[docs] class NeffMultibandPipeConfig( PipelineTaskConfig, pipelineConnections=NeffMultibandPipeConnections, ):
[docs] do_correct_selection_bias = Field[bool]( doc="Whether correct for selection bias", default=True, )
[docs] shear_name = Field[str]( doc="the shear component to test", default="g1", )
[docs] shear_value = Field[float]( doc="absolute value of the shear", default=0.02, )
[docs] shape_name = Field[str]( doc="ellipticity column name", default="fpfs_e1", )
[docs] dshape_name = Field[str]( doc="ellipticity's shear response column name", default="fpfs_de1", )
[docs] weight_name = Field[str]( doc="weight column name", default="fpfs_w", )
[docs] dweight_name = Field[str]( doc="weight's shear response column name", default="fpfs_dw", )
[docs] flux_name = Field[str]( doc="flux column name", default="fpfs_m", )
[docs] dflux_name = Field[str]( doc="flux's shear response column name", default="fpfs_dm", )
[docs] flux_cuts = ListField[float]( doc="lower limit of flux", default=[6.0, 12.0, 18.0, 24.0, 30.0], )
[docs] bound = Field[int]( doc="Sources to be removed if too close to boundary", default=40, )
[docs] def validate(self): super().validate() if len(self.connections.dataType) == 0: raise ValueError("connections.dataType missing") if self.shear_name not in ["g1", "g2"]: raise FieldValidationError( self.__class__.shear_name, self, "shear_name can only be 'g1' or 'g2'", ) if self.shear_value < 0.0 or self.shear_value > 0.10: raise FieldValidationError( self.__class__.shear_value, self, "shear_value should be in [0., 0.10]", )
[docs] class NeffMultibandPipe(PipelineTask):
[docs] _DefaultName = "FpfsNeffTask"
[docs] ConfigClass = NeffMultibandPipeConfig
def __init__( self, *, config: NeffMultibandPipeConfig | None = None, log: logging.Logger | LsstLogAdapter | None = None, initInputs: dict[str, Any] | None = None, **kwargs: Any, ): super().__init__( config=config, log=log, initInputs=initInputs, **kwargs ) assert isinstance(self.config, NeffMultibandPipeConfig)
[docs] self.sname = self.config.shear_name
[docs] self.svalue = self.config.shear_value
[docs] self.ename = self.config.shape_name
ins = "_dg" + self.ename[-1]
[docs] self.egname = self.config.dshape_name + ins
[docs] self.wname = self.config.weight_name
[docs] self.wgname = self.config.dweight_name + ins
[docs] self.fname = self.config.flux_name
[docs] self.fgname = self.config.dflux_name + ins
return
[docs] def runQuantum(self, butlerQC, inputRefs, outputRefs): assert isinstance(self.config, NeffMultibandPipeConfig) # Retrieve the filename of the input exposure inputs = butlerQC.get(inputRefs) tract = butlerQC.quantum.dataId["tract"] patch = butlerQC.quantum.dataId["patch"] skyMap = inputs["skyMap"] patch_info = skyMap[tract][patch] outputs = self.run(inputs["src"], patch_info) butlerQC.put(outputs, outputRefs) return
[docs] def measure_shear_flux_cut(self, src, flux_min): assert isinstance(self.config, NeffMultibandPipeConfig) en = self.ename egn = self.egname wname = self.wname wgname = self.wgname fname = self.fname fgname = self.fgname msk = (~np.isnan(src[fgname])) & (~np.isnan(src[egn])) src = src[msk] tmp = src[src[fname] > flux_min] ngal = len(msk) ell = np.sum(tmp[en] * tmp[wname]) res = np.sum(tmp[egn] * tmp[wname] + tmp[en] * tmp[wgname]) if self.config.do_correct_selection_bias: dg = 0.02 # selection tmp = src[(src[fname] + dg * src[fgname]) > flux_min] ellp = np.sum(tmp[en] * tmp[wname]) del tmp # selection tmp = src[(src[fname] - dg * src[fgname]) > flux_min] ellm = np.sum(tmp[en] * tmp[wname]) res_sel = (ellp - ellm) / 2.0 / dg del tmp else: res_sel = 0.0 return ell, (res + res_sel), ngal
[docs] def run(self, src, patch_info): assert isinstance(self.config, NeffMultibandPipeConfig) bbox = patch_info.getOuterBBox() pixel_scale = ( patch_info.getWcs().getPixelScale().asDegrees() * 60 # arcmin ) area = ( (bbox.getHeight() - 2.0 * self.config.bound) * (bbox.getWidth() - 2.0 * self.config.bound) * pixel_scale**2.0 ) # arcmin ncuts = len(self.config.flux_cuts) data_type = [ ("up", "f8"), ("down", "f8"), ("ngal", "f8"), ("area", "f8"), ] summary = np.zeros(ncuts, dtype=data_type) for ic, flux_min in enumerate(self.config.flux_cuts): ell, res, ngal = self.measure_shear_flux_cut(src, flux_min) summary["up"][ic] = ell summary["down"][ic] = res summary["ngal"][ic] = ngal summary["area"][ic] = area return Struct(summary=summary)
[docs] class NeffSummaryMultibandPipeConnections( PipelineTaskConnections, dimensions=(), defaultTemplates={ "coaddName": "deep", "dataType": "", "version": "", }, ):
[docs] summary_list = cT.Input( doc="Source catalog with all the measurement generated in this task", name="{coaddName}_coadd_anacal_neff_flux_{dataType}{version}", dimensions=("skymap", "tract", "patch"), storageClass="ArrowAstropy", multiple=True, deferLoad=True, )
def __init__(self, *, config=None): super().__init__(config=config)
[docs] class NeffSummaryMultibandPipeConfig( PipelineTaskConfig, pipelineConnections=NeffSummaryMultibandPipeConnections, ):
[docs] def validate(self): super().validate() if len(self.connections.dataType) == 0: raise ValueError("connections.dataType missing")
[docs] class NeffSummaryMultibandPipe(PipelineTask):
[docs] _DefaultName = "FpfsNeffSummaryTask"
[docs] ConfigClass = NeffSummaryMultibandPipeConfig
def __init__( self, *, config: NeffSummaryMultibandPipeConfig | None = None, log: logging.Logger | LsstLogAdapter | None = None, initInputs: dict[str, Any] | None = None, **kwargs: Any, ): super().__init__( config=config, log=log, initInputs=initInputs, **kwargs ) assert isinstance(self.config, NeffSummaryMultibandPipeConfig) return
[docs] def runQuantum(self, butlerQC, inputRefs, outputRefs): assert isinstance(self.config, NeffSummaryMultibandPipeConfig) # Retrieve the filename of the input exposure inputs = butlerQC.get(inputRefs) self.run(**inputs) return
[docs] def run(self, *, summary_list): assert isinstance(self.config, NeffSummaryMultibandPipeConfig) up = [] down = [] ngal = [] for res in summary_list: res = res.get() up.append(np.array(res["up"])) down.append(np.array(res["down"])) ngal.append(np.array(res["ngal"])) area = res["area"][0] up = np.vstack(up) down = np.vstack(down) ngal = np.vstack(ngal) std = np.std(up / down, axis=0) neff = (0.26 / std) ** 2.0 / area print("neff: ", neff) return