Source code for xlens.analysis.selbias

# This file is part of pipe_tasks.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

__all__ = [
    "SelBiasMultibandPipeConnections",
    "SelBiasMultibandPipeConfig",
    "SelBiasMultibandPipe",
    "SelBiasSummaryMultibandPipeConnections",
    "SelBiasSummaryMultibandPipeConfig",
    "SelBiasSummaryMultibandPipe",
]

import logging
from typing import Any

import lsst.pipe.base.connectionTypes as cT
import numpy as np
from lsst.pex.config import Field, FieldValidationError, ListField
from lsst.pipe.base import (
    PipelineTask,
    PipelineTaskConfig,
    PipelineTaskConnections,
    Struct,
)
from lsst.utils.logging import LsstLogAdapter


[docs] class SelBiasMultibandPipeConnections( PipelineTaskConnections, dimensions=("skymap", "tract", "patch"), defaultTemplates={ "coaddName": "deep", "dataType": "anacal_detect", "version": "fpfs", }, ):
[docs] src00 = cT.Input( doc="Source catalog with all the measurement generated in this task", name="{coaddName}_0_rot0_coadd_{dataType}", dimensions=("skymap", "tract", "patch"), storageClass="ArrowAstropy", )
[docs] src01 = cT.Input( doc="Source catalog with all the measurement generated in this task", name="{coaddName}_0_rot1_coadd_{dataType}", dimensions=("skymap", "tract", "patch"), storageClass="ArrowAstropy", )
[docs] src10 = cT.Input( doc="Source catalog with all the measurement generated in this task", name="{coaddName}_1_rot0_coadd_{dataType}", dimensions=("skymap", "tract", "patch"), storageClass="ArrowAstropy", )
[docs] src11 = cT.Input( doc="Source catalog with all the measurement generated in this task", name="{coaddName}_1_rot1_coadd_{dataType}", dimensions=("skymap", "tract", "patch"), storageClass="ArrowAstropy", )
[docs] summary = cT.Output( doc="Summary statistics", name="{coaddName}_coadd_anacal_selbias_flux_{dataType}_{version}", storageClass="ArrowAstropy", dimensions=("skymap", "tract", "patch"), )
def __init__(self, *, config=None): super().__init__(config=config)
[docs] class SelBiasMultibandPipeConfig( PipelineTaskConfig, pipelineConnections=SelBiasMultibandPipeConnections, ):
[docs] do_correct_selection_bias = Field[bool]( doc="Whether correct for selection bias", default=True, )
[docs] shear_name = Field[str]( doc="the shear component to test", default="g1", )
[docs] shear_value = Field[float]( doc="absolute value of the shear", default=0.02, )
[docs] shape_name = Field[str]( doc="ellipticity column name", default="fpfs_e1", )
[docs] dshape_name = Field[str]( doc="ellipticity's shear response column name", default="fpfs_de1", )
[docs] weight_name = Field[str]( doc="weight column name", default="fpfs_w", )
[docs] dweight_name = Field[str]( doc="weight's shear response column name", default="fpfs_dw", )
[docs] flux_name = Field[str]( doc="flux column name", default="fpfs_m00", )
[docs] dflux_name = Field[str]( doc="flux's shear response column name", default="fpfs_dm00", )
[docs] flux_cuts = ListField[float]( doc="lower limit of flux", default=[6.0, 12.0, 18.0, 24.0, 30.0], )
[docs] def validate(self): super().validate() if len(self.connections.dataType) == 0: raise ValueError("connections.dataType missing") if self.shear_name not in ["g1", "g2"]: raise FieldValidationError( self.__class__.shear_name, self, "shear_name can only be 'g1' or 'g2'", ) if self.shear_value < 0.0 or self.shear_value > 0.10: raise FieldValidationError( self.__class__.shear_value, self, "shear_value should be in [0.00, 0.10]", )
[docs] class SelBiasMultibandPipe(PipelineTask):
[docs] _DefaultName = "FpfsSelBiasTask"
[docs] ConfigClass = SelBiasMultibandPipeConfig
def __init__( self, *, config: SelBiasMultibandPipeConfig | None = None, log: logging.Logger | LsstLogAdapter | None = None, initInputs: dict[str, Any] | None = None, **kwargs: Any, ): super().__init__( config=config, log=log, initInputs=initInputs, **kwargs ) assert isinstance(self.config, SelBiasMultibandPipeConfig)
[docs] self.sname = self.config.shear_name
[docs] self.svalue = self.config.shear_value
[docs] self.ename = self.config.shape_name
ins = "_dg" + self.ename[-1]
[docs] self.egname = self.config.dshape_name + ins
[docs] self.wname = self.config.weight_name
[docs] self.wgname = self.config.dweight_name + ins
[docs] self.fname = self.config.flux_name
[docs] self.fgname = self.config.dflux_name + ins
return
[docs] def runQuantum(self, butlerQC, inputRefs, outputRefs): assert isinstance(self.config, SelBiasMultibandPipeConfig) # Retrieve the filename of the input exposure inputs = butlerQC.get(inputRefs) outputs = self.run(**inputs) butlerQC.put(outputs, outputRefs) return
[docs] def measure_shear_flux_cut(self, src, flux_min): assert isinstance(self.config, SelBiasMultibandPipeConfig) en = self.ename egn = self.egname comp = en[-1] if comp not in {"1", "2"}: raise ValueError( f"Ellipticity column {en} must end with '1' or '2'" ) comp2 = "2" if comp == "1" else "1" en2 = en[:-1] + comp2 prefix, sep, shear_suffix = egn.rpartition("_dg") if not sep or not prefix.endswith(comp): raise ValueError(f"Unexpected shear response column format: {egn}") egn2 = prefix[:-1] + comp2 + sep + shear_suffix if "fpfs" in en: emax = 0.3 else: emax = 2.0 wname = self.wname wgname = self.wgname fname = self.fname fgname = self.fgname msk = (~np.isnan(src[fgname])) & (~np.isnan(src[egn])) src = src[msk] # selection esq = src[en] ** 2 + src[en2] ** 2 msk = (src[fname] > flux_min) & (esq < emax ** 2.0) tmp = src[msk] ell = np.sum(tmp[en] * tmp[wname]) res = np.sum(tmp[egn] * tmp[wname] + tmp[en] * tmp[wgname]) del msk, tmp, esq if self.config.do_correct_selection_bias: dg = 0.02 # selection esq = ( src[en] ** 2 + src[en2] ** 2 + 2.0 * dg * (src[en] * src[egn] + src[en2] * src[egn2]) ) msk = ( ((src[fname] + dg * src[fgname]) > flux_min) & (esq < emax) ) tmp = src[msk] ellp = np.sum(tmp[en] * tmp[wname]) del tmp, esq, msk # selection esq = ( src[en] ** 2 + src[en2] ** 2 - 2.0 * dg * (src[en] * src[egn] + src[en2] * src[egn2]) ) msk = ( ((src[fname] - dg * src[fgname]) > flux_min) & (esq < emax) ) tmp = src[msk] ellm = np.sum(tmp[en] * tmp[wname]) res_sel = (ellp - ellm) / 2.0 / dg del tmp, esq, msk else: res_sel = 0.0 return ell, (res + res_sel)
[docs] def run(self, *, src00, src10, src01=None, src11=None, **kwargs): assert isinstance(self.config, SelBiasMultibandPipeConfig) ncuts = len(self.config.flux_cuts) em = np.zeros(ncuts) ep = np.zeros(ncuts) rm = np.zeros(ncuts) rp = np.zeros(ncuts) for ic, flux_min in enumerate(self.config.flux_cuts): ell00, res00 = self.measure_shear_flux_cut(src00, flux_min) ell10, res10 = self.measure_shear_flux_cut(src10, flux_min) if (src01 is None) or (src11 is None): ell01, res01, ell11, res11 = (0.0, 0.0, 0.0, 0.0) else: ell01, res01 = self.measure_shear_flux_cut(src01, flux_min) ell11, res11 = self.measure_shear_flux_cut(src11, flux_min) em[ic] = ell00 + ell01 ep[ic] = ell10 + ell11 rm[ic] = res00 + res01 rp[ic] = res10 + res11 data_type = [ ("up1", "f8"), ("up2", "f8"), ("down", "f8"), ] summary = np.zeros(ncuts, dtype=data_type) summary["up1"] = (ep - em) / 2.0 summary["up2"] = (em + ep) / 2.0 summary["down"] = (rm + rp) / 2.0 return Struct(summary=summary)
[docs] class SelBiasSummaryMultibandPipeConnections( PipelineTaskConnections, dimensions=(), defaultTemplates={ "coaddName": "deep", "dataType": "anacal_joint", "version": "fpfs", }, ):
[docs] summary_list = cT.Input( doc="Source catalog with all the measurement generated in this task", name="{coaddName}_coadd_anacal_selbias_flux_{dataType}_{version}", dimensions=("skymap", "tract", "patch"), storageClass="ArrowAstropy", multiple=True, deferLoad=True, )
def __init__(self, *, config=None): super().__init__(config=config)
[docs] class SelBiasSummaryMultibandPipeConfig( PipelineTaskConfig, pipelineConnections=SelBiasSummaryMultibandPipeConnections, ):
[docs] estimate_multiplicative_bias = Field[bool]( doc="Whether estimate multiplicative bias", default=True, )
[docs] shear_value = Field[float]( doc="absolute value of the shear", default=0.02, )
[docs] def validate(self): super().validate() if len(self.connections.dataType) == 0: raise ValueError("connections.dataType missing") if self.shear_value < 0.0 or self.shear_value > 0.10: raise FieldValidationError( self.__class__.shear_value, self, "shear_value should be in [0.00, 0.10]", )
[docs] class SelBiasSummaryMultibandPipe(PipelineTask):
[docs] _DefaultName = "FpfsSelBiasSummaryTask"
[docs] ConfigClass = SelBiasSummaryMultibandPipeConfig
def __init__( self, *, config: SelBiasSummaryMultibandPipeConfig | None = None, log: logging.Logger | LsstLogAdapter | None = None, initInputs: dict[str, Any] | None = None, **kwargs: Any, ): super().__init__( config=config, log=log, initInputs=initInputs, **kwargs ) assert isinstance(self.config, SelBiasSummaryMultibandPipeConfig)
[docs] self.svalue = self.config.shear_value
return
[docs] def runQuantum(self, butlerQC, inputRefs, outputRefs): assert isinstance(self.config, SelBiasSummaryMultibandPipeConfig) # Retrieve the filename of the input exposure inputs = butlerQC.get(inputRefs) self.run(**inputs) return
[docs] def run(self, *, summary_list, **kwargs): assert isinstance(self.config, SelBiasSummaryMultibandPipeConfig) up1 = [] up2 = [] down = [] for res in summary_list: res = res.get() up1.append(np.array(res["up1"])) up2.append(np.array(res["up2"])) down.append(np.array(res["down"])) nsim = len(up1) denom = np.average(down, axis=0) tmp = np.vstack(up1) + np.vstack(up2) print( "Positive shear:", np.average(tmp, axis=0) / denom, "+-", np.std(tmp, axis=0) / denom / np.sqrt(nsim), ) tmp = np.vstack(up2) - np.vstack(up1) print( "Negative shear:", np.average(tmp, axis=0) / denom, "+-", np.std(tmp, axis=0) / denom / np.sqrt(nsim), ) if self.config.estimate_multiplicative_bias: print( "Multiplicative bias:", np.average(up1, axis=0) / denom / self.svalue - 1, "+-", np.std(up1, axis=0) / denom / np.sqrt(nsim) / self.svalue, ) else: print( "We do not estimate multiplicative bias:", ) print( "Additive bias:", np.average(up2, axis=0) / denom, "+-", np.std(up2, axis=0) / denom / np.sqrt(nsim), ) return