Source code for xlens.analysis.selbias_ranforest

# This file is part of pipe_tasks.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

__all__ = [
    "SelBiasRfMultibandPipeConnections",
    "SelBiasRfMultibandPipeConfig",
    "SelBiasRfMultibandPipe",
    "SelBiasRfSummaryMultibandPipeConnections",
    "SelBiasRfSummaryMultibandPipeConfig",
    "SelBiasRfSummaryMultibandPipe",
]

import pickle
from typing import Any

import lsst.pipe.base.connectionTypes as cT
import numpy as np
from lsst.pex.config import Field, FieldValidationError, ListField
from lsst.pipe.base import (
    PipelineTask,
    PipelineTaskConfig,
    PipelineTaskConnections,
    Struct,
)
from lsst.utils.logging import LsstLogAdapter


[docs] class SelBiasRfMultibandPipeConnections( PipelineTaskConnections, dimensions=("skymap", "tract", "patch"), defaultTemplates={ "coaddName": "deep", "dataType": "", }, ):
[docs] src00 = cT.Input( doc="Source catalog with all the measurement generated in this task", name="{coaddName}_0_rot0_coadd_{dataType}", dimensions=("skymap", "tract", "patch"), storageClass="ArrowAstropy", )
[docs] src01 = cT.Input( doc="Source catalog with all the measurement generated in this task", name="{coaddName}_0_rot1_coadd_{dataType}", dimensions=("skymap", "tract", "patch"), storageClass="ArrowAstropy", )
[docs] src10 = cT.Input( doc="Source catalog with all the measurement generated in this task", name="{coaddName}_1_rot0_coadd_{dataType}", dimensions=("skymap", "tract", "patch"), storageClass="ArrowAstropy", )
[docs] src11 = cT.Input( doc="Source catalog with all the measurement generated in this task", name="{coaddName}_1_rot1_coadd_{dataType}", dimensions=("skymap", "tract", "patch"), storageClass="ArrowAstropy", )
[docs] summary = cT.Output( doc="Summary statistics", name="{coaddName}_coadd_anacal_selbias_ranforest_{dataType}", storageClass="ArrowAstropy", dimensions=("skymap", "tract", "patch"), )
def __init__(self, *, config=None): super().__init__(config=config)
[docs] class SelBiasRfMultibandPipeConfig( PipelineTaskConfig, pipelineConnections=SelBiasRfMultibandPipeConnections, ):
[docs] do_correct_selection_bias = Field[bool]( doc="Whether correct for selection bias", default=True, )
[docs] shape_name = Field[str]( doc="ellipticity column name", default="e1", )
[docs] shear_name = Field[str]( doc="the shear component to test", default="g1", )
[docs] shear_value = Field[float]( doc="absolute value of the shear", default=0.02, )
[docs] thresholds = ListField[float]( doc="upper limit of score", default=[0.04, 0.08, 0.12, 0.16, 0.20], )
[docs] mag_zero = Field[float]( doc="calibration magnitude zero point", default=30.0, )
[docs] model_name = Field[str]( doc="random forest modle pickle file name", default="simple_sim_RF.pkl", )
[docs] def validate(self): super().validate() if len(self.connections.dataType) == 0: raise ValueError("connections.dataType missing") if self.shear_name not in ["g1", "g2"]: raise FieldValidationError( self.__class__.shear_name, self, "shear_name can only be 'g1' or 'g2'", ) if self.shear_value < 0.0 or self.shear_value > 0.10: raise FieldValidationError( self.__class__.shear_value, self, "shear_value should be in [0.00, 0.10]", )
def name_add_d(ename, nchars): return ename[:-nchars] + "d" + ename[-nchars:]
[docs] class SelBiasRfMultibandPipe(PipelineTask):
[docs] _DefaultName = "FpfsSelBiasRfTask"
[docs] ConfigClass = SelBiasRfMultibandPipeConfig
def __init__( self, *, config: SelBiasRfMultibandPipeConfig | None = None, log: LsstLogAdapter | None = None, initInputs: dict[str, Any] | None = None, **kwargs: Any, ): super().__init__( config=config, log=log, initInputs=initInputs, **kwargs ) assert isinstance(self.config, SelBiasRfMultibandPipeConfig)
[docs] self.sname = self.config.shear_name
[docs] self.svalue = self.config.shear_value
[docs] self.ename = self.config.shape_name
ins = "_dg" + self.ename[-1]
[docs] self.egname = name_add_d(self.ename, 2) + ins
[docs] self.wname = "fpfs_w"
[docs] self.wgname = name_add_d(self.wname, 1) + ins
with open(self.config.model_name, "rb") as f: self.clf = pickle.load(f) return @staticmethod
[docs] def measure_distorted_photomoetry(*, src, dg, mag_zero): phot = [] for band in "grizy": phot.append( mag_zero - np.log10( src[f"{band}_fpfs1_m00"] + dg * src[f"{band}_fpfs1_dm00_dg1"] ) * 2.5 ) phot = np.vstack(phot).T return phot
[docs] def measure_shear(self, *, src, dg, threshold): assert isinstance(self.config, SelBiasRfMultibandPipeConfig) en = self.ename egn = self.egname wname = self.wname wgname = self.wgname phot = self.measure_distorted_photomoetry( src=src, dg=dg, mag_zero=self.config.mag_zero, ) scores = self.clf.predict_proba(phot)[:, 1] mask = scores < threshold tmp = src[mask] ell = np.sum(tmp[en] * tmp[wname]) res = np.sum(tmp[egn] * tmp[wname] + tmp[en] * tmp[wgname]) return { "ellipticity": ell, "response": res, }
[docs] def measure_shear_ranforest(self, src, threshold): assert isinstance(self.config, SelBiasRfMultibandPipeConfig) summary = self.measure_shear(src=src, dg=0.00, threshold=threshold) ell = summary["ellipticity"] res = summary["response"] if self.config.do_correct_selection_bias: dg = 0.01 ellp = self.measure_shear( src=src, dg=dg, threshold=threshold, )["ellipticity"] ellm = self.measure_shear( src=src, dg=-dg, threshold=threshold, )["ellipticity"] res_sel = (ellp - ellm) / 2.0 / dg else: res_sel = 0.0 return ell, (res + res_sel)
[docs] def runQuantum(self, butlerQC, inputRefs, outputRefs): assert isinstance(self.config, SelBiasRfMultibandPipeConfig) # Retrieve the filename of the input exposure inputs = butlerQC.get(inputRefs) outputs = self.run(**inputs) butlerQC.put(outputs, outputRefs) return
[docs] def run(self, *, src00, src01, src10, src11, **kwargs): assert isinstance(self.config, SelBiasRfMultibandPipeConfig) ncuts = len(self.config.thresholds) em = np.zeros(ncuts) ep = np.zeros(ncuts) rm = np.zeros(ncuts) rp = np.zeros(ncuts) for ic, threshold in enumerate(self.config.thresholds): ell00, res00 = self.measure_shear_ranforest(src00, threshold) ell10, res10 = self.measure_shear_ranforest(src10, threshold) ell01, res01 = self.measure_shear_ranforest(src01, threshold) ell11, res11 = self.measure_shear_ranforest(src11, threshold) em[ic] = ell00 + ell01 ep[ic] = ell10 + ell11 rm[ic] = res00 + res01 rp[ic] = res10 + res11 data_type = [ ("up1", "f8"), ("up2", "f8"), ("down", "f8"), ] summary = np.zeros(ncuts, dtype=data_type) summary["up1"] = (ep - em) / 2.0 summary["up2"] = (em + ep) / 2.0 summary["down"] = (rm + rp) / 2.0 return Struct(summary=summary)
[docs] class SelBiasRfSummaryMultibandPipeConnections( PipelineTaskConnections, dimensions=(), defaultTemplates={ "coaddName": "deep", "dataType": "", }, ):
[docs] summary_list = cT.Input( doc="Source catalog with all the measurement generated in this task", name="{coaddName}_coadd_anacal_selbias_ranforest_{dataType}", dimensions=("skymap", "tract", "patch"), storageClass="ArrowAstropy", multiple=True, deferLoad=True, )
def __init__(self, *, config=None): super().__init__(config=config)
[docs] class SelBiasRfSummaryMultibandPipeConfig( PipelineTaskConfig, pipelineConnections=SelBiasRfSummaryMultibandPipeConnections, ):
[docs] shape_name = Field[str]( doc="ellipticity column name", default="e1", )
[docs] shear_name = Field[str]( doc="the shear component to test", default="g1", )
[docs] shear_value = Field[float]( doc="absolute value of the shear", default=0.02, )
[docs] def validate(self): super().validate() if len(self.connections.dataType) == 0: raise ValueError("connections.dataType missing") if self.shear_name not in ["g1", "g2"]: raise FieldValidationError( self.__class__.shear_name, self, "shear_name can only be 'g1' or 'g2'", ) if self.shear_value < 0.0 or self.shear_value > 0.10: raise FieldValidationError( self.__class__.shear_value, self, "shear_value should be in [0.00, 0.10]", )
[docs] class SelBiasRfSummaryMultibandPipe(PipelineTask):
[docs] _DefaultName = "FpfsSelBiasRfSummaryTask"
[docs] ConfigClass = SelBiasRfSummaryMultibandPipeConfig
def __init__( self, *, config: SelBiasRfSummaryMultibandPipeConfig | None = None, log: LsstLogAdapter | None = None, initInputs: dict[str, Any] | None = None, **kwargs: Any, ): super().__init__( config=config, log=log, initInputs=initInputs, **kwargs ) assert isinstance(self.config, SelBiasRfSummaryMultibandPipeConfig)
[docs] self.ename = self.config.shape_name
[docs] self.sname = self.config.shear_name
[docs] self.svalue = self.config.shear_value
return
[docs] def runQuantum(self, butlerQC, inputRefs, outputRefs): assert isinstance(self.config, SelBiasRfSummaryMultibandPipeConfig) # Retrieve the filename of the input exposure inputs = butlerQC.get(inputRefs) self.run(**inputs) return
[docs] def run(self, *, summary_list, **kwargs): assert isinstance(self.config, SelBiasRfSummaryMultibandPipeConfig) up1 = [] up2 = [] down = [] for res in summary_list: res = res.get() up1.append(np.array(res["up1"])) up2.append(np.array(res["up2"])) down.append(np.array(res["down"])) nsim = len(up1) denom = np.average(down, axis=0) tmp = np.vstack(up1) + np.vstack(up2) print( "Positive shear:", np.average(tmp, axis=0) / denom, "+-", np.std(tmp, axis=0) / denom / np.sqrt(nsim), ) tmp = np.vstack(up2) - np.vstack(up1) print( "Negative shear:", np.average(tmp, axis=0) / denom, "+-", np.std(tmp, axis=0) / denom / np.sqrt(nsim), ) if self.sname[-1] == self.ename[-1]: print( "Multiplicative bias:", np.average(up1, axis=0) / denom / self.svalue - 1, "+-", np.std(up1, axis=0) / denom / np.sqrt(nsim) / self.svalue, ) else: print( "We do not estimate multiplicative bias:", ) print( "Additive bias:", np.average(up2, axis=0) / denom, "+-", np.std(up2, axis=0) / denom / np.sqrt(nsim), ) return