diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 96d1ba89..f34aa076 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,8 +10,10 @@ on: - cron: '0 12 * * 1' env: - EXAMPLE_DATA_FILE_VERSION: v3 - + EXAMPLE_DATA_FILE_VERSION: v5 + # Setting this did not appear to work. Instead will need to + # find/replace when changing it. + # CONTAINER_IMAGE: ghcr.io/lsstdesc/txpipe:latest jobs: # Run a download step first so that it is in @@ -110,7 +112,7 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest, macos-14] - pipeline: [metadetect, metacal, redmagic, lensfit, metadetect_source_only] + pipeline: [metadetect, metacal, redmagic, lensfit, metadetect_source_only, mock_shear] include: - os: ubuntu-latest INSTALL_DEPS: echo nothing to do @@ -158,7 +160,12 @@ jobs: export TX_DASK_DEBUG=1 fi ceci examples/${{ matrix.pipeline }}/pipeline.yml - test -f data/example/outputs_${{ matrix.pipeline }}/shear_xi_plus.png + + if [ "${{ matrix.pipeline }}" == "mock_shear" ]; then + test -f data/example/outputs_mock_shear/binned_shear_catalog.hdf5 + else + test -f data/example/outputs_${{ matrix.pipeline }}/shear_xi_plus.png + fi - name: Show logs if: always() diff --git a/bin/generate-nfw-shear-cat.py b/bin/generate-nfw-shear-cat.py new file mode 100644 index 00000000..076a0333 --- /dev/null +++ b/bin/generate-nfw-shear-cat.py @@ -0,0 +1,70 @@ +import argparse + + + +parser = argparse.ArgumentParser() +parser.add_argument("--output", type=str, default="simple_cat.txt") +parser.add_argument("--nmax", type=int, default=10000, help="Number of objects to generate, though cat size is reduced by redshift cut") +parser.add_argument("--mass", type=float, default=10.0e14, help="Cluster mass in Msun") +parser.add_argument("--cluster_z", type=float, default=0.22, help="Cluster redshift") +parser.add_argument("--concentration", type=float, default=4.0, help="Cluster concentration parameter") +parser.add_argument("--size", type=float, default=240.0, help="Size of the region in arcmin") +parser.add_argument("--mean-z", type=float, default=0.5, help="Mean redshift of the background galaxies") +parser.add_argument("--sigma-z", type=float, default=0.1, help="Redshift std dev of the background galaxies") +parser.add_argument("--mean-snr", type=float, default=20., help="Mean SNR") +parser.add_argument("--mean-size", type=float, default=0.3, help="Galaxy mean size^2 T parameter in arcsec") +parser.add_argument("--sigma-size", type=float, default=0.3, help="Galaxy std dev size^2 T parameter in arcsec") +args = parser.parse_args() + +mass = args.mass +cluster_z = args.cluster_z +concentration = args.concentration + + +import galsim +import numpy as np +from astropy.table import Table +import argparse + +nfw = galsim.NFWHalo(mass, concentration, cluster_z) + +half_size = args.size / 2 +xmin = -half_size +xmax = half_size +ymin = -half_size +ymax = half_size + +N = 10000 + +x = np.random.uniform(xmin, xmax, N) +y = np.random.uniform(ymin, ymax, N) +z = np.random.normal(args.mean_z, args.sigma_z, N) + +w = np.where(z > nfw.z) +x1 = x[w] +y1 = y[w] +z1 = z[w] +n = z1.size + +ra = np.zeros(n) + x1 / 3600 +dec = np.zeros(n) + y1 / 3600 +g1, g2 = nfw.getShear([x1, y1], z1, reduced=False) +s2n = np.random.exponential(args.mean_snr, size=n) +print(s2n.mean()) + +# This should give plenty of selected objects since we cut on T/Tpsf > 0.5 by default +# and Tpsf default is ~.2 +T = np.random.normal(args.mean_size, args.sigma_size, size=n).clip(0.01, np.inf) + +data = { + "ra": ra, + "dec": dec, + "g1": g1, + "g2": g2, + "s2n": s2n, + "T": T, + "redshift": z1, +} + +table = Table(data=data) +table.write(args.output, overwrite=True, format="ascii.commented_header") diff --git a/bin/make_single_cluster_catalog.py b/bin/make_single_cluster_catalog.py new file mode 100644 index 00000000..c4e6d901 --- /dev/null +++ b/bin/make_single_cluster_catalog.py @@ -0,0 +1,24 @@ +import numpy as np +import h5py +ra = np.array([0.0]) +dec = np.array([0.0]) +z = np.array([0.22]) +z_err = np.array([0.01]) +ids = np.array([0]) +richness = np.array([10.0]) +richness_err = np.array([1.0]) +scale = np.array([1.0]) + +with h5py.File("mock_single_cluster_catalog.hdf5", "w") as f: + g = f.create_group("clusters") + g + g.create_dataset("cluster_id", data=ids) + g.create_dataset("ra", data=ra) + g.create_dataset("dec", data=dec) + g.create_dataset("redshift", data=z) + g.create_dataset("redshift_err", data=z_err) + g.create_dataset("richness", data=richness) + g.create_dataset("richness_err", data=richness_err) + g.create_dataset("scaleval", data=scale) + + g.attrs["n_clusters"] = 1 diff --git a/environment-piponly.yml b/environment-piponly.yml index d0d52d53..a700caae 100644 --- a/environment-piponly.yml +++ b/environment-piponly.yml @@ -3,7 +3,7 @@ dependencies: - ceci==2.0.1 - git+https://github.com/jlvdb/hyperbolic@b88b107a291fa16c2006cf971ce610248d58e94c - dask-mpi>=2022.4.0 - - git+https://github.com/LSSTDESC/CLMM.git@1.12.5 + - git+https://github.com/LSSTDESC/CLMM.git@1.14.1 - parallel-statistics==0.13 - sacc[all]==0.14 - jax==0.4.27 diff --git a/examples/mock_shear/config.yml b/examples/mock_shear/config.yml new file mode 100644 index 00000000..0d22c398 --- /dev/null +++ b/examples/mock_shear/config.yml @@ -0,0 +1,26 @@ +global: + chunk_rows: 100000 + pixelization: healpix + nside: 64 + sparse: true + + +TXSourceSelectorSimple: + input_pz: false + true_z: true + bands: riz + T_cut: 0.5 + s2n_cut: 10.0 + max_rows: 1000 + delta_gamma: 0.02 + source_zbin_edges: [0.5, 0.7, 0.9, 1.1, 2.0] + shear_prefix: '' + verbose: true + +TXMockTruthPZ: + mock_sigma_z: 0.001 + + +CLClusterShearCatalogs: + redshift_cut_criterion: zmode + redshift_weight_criterion: zmode diff --git a/examples/mock_shear/pipeline.yml b/examples/mock_shear/pipeline.yml new file mode 100644 index 00000000..252d8fe9 --- /dev/null +++ b/examples/mock_shear/pipeline.yml @@ -0,0 +1,54 @@ +# Stages to run +stages: + - name: TXSimpleMock # Convert a text file mock catalog to HDF5 + - name: TXSourceSelectorSimple # select and split objects into source bins + - name: TXShearCalibration # Calibrate and split the source sample tomographically + - name: TXMockTruthPZ # Generate PDFs as narrow gaussian centered on the true redshifts + aliases: + photoz_pdfs: source_photoz_pdfs + - name: CLClusterShearCatalogs # Extract and weight the shear catalog around every cluster + + + +# modules and packages to import that have pipeline +# stages defined in them +modules: > + txpipe + +# where to find any modules that are not in this repo, +# and any other code we need. +python_paths: + - submodules/WLMassMap/python/desc/ + +# Where to put outputs +output_dir: data/example/outputs_mock_shear + +# How to run the pipeline: mini, parsl, or cwl +launcher: + name: mini + interval: 1.0 + +# Where to run the pipeline: cori-interactive, cori-batch, or local +site: + name: local + max_threads: 2 + +# configuration settings +config: examples/mock_shear/config.yml + +# These are overall inputs to the whole pipeline, not generated within it +inputs: + mock_shear_catalog: data/example/inputs/mock_nfw_shear_catalog.txt + calibration_table: data/example/inputs/sample_cosmodc2_w10year_errors.dat + cluster_catalog: data/example/inputs/mock_single_cluster_catalog.hdf5 + fiducial_cosmology: data/fiducial_cosmology.yml + + + +# if supported by the launcher, restart the pipeline where it left off +# if interrupted +resume: true +# where to put output logs for individual stages +log_dir: data/example/logs_mock_shear +# where to put an overall parsl pipeline log +pipeline_log: data/example/mock_shear_log.txt diff --git a/txpipe/calibrate.py b/txpipe/calibrate.py index 87c8e788..720ff144 100644 --- a/txpipe/calibrate.py +++ b/txpipe/calibrate.py @@ -74,6 +74,7 @@ def run(self): # is needed tomo_file = self.get_input("shear_tomography_catalog") cals, cal2d = Calibrator.load(tomo_file, null=use_true) + print("Using calibration method:", cal2d.__class__.__name__) # The catalog columns are named differently in different cases #  Get the correct shear catalogs diff --git a/txpipe/ingest/mocks.py b/txpipe/ingest/mocks.py index b6cf7826..d42314ca 100755 --- a/txpipe/ingest/mocks.py +++ b/txpipe/ingest/mocks.py @@ -1,5 +1,5 @@ from ..base_stage import PipelineStage -from ..data_types import ShearCatalog, HDFFile +from ..data_types import ShearCatalog, HDFFile, TextFile, QPPDFFile from ..utils import ( band_variants, metacal_variants, @@ -1219,6 +1219,104 @@ def generate_mock_metacal_mag_responses(bands, nobj): return mag_responses +class TXSimpleMock(PipelineStage): + """ + Load an ascii astropy table and put it in shear catalog format. + """ + name = "TXSimpleMock" + inputs = [("mock_shear_catalog", TextFile)] + outputs = [("shear_catalog", ShearCatalog)] + config_options = { + "mock_size_snr": False, + } + def run(self): + from astropy.table import Table + import numpy as np + + # Load the data. We are assuming here it is small enough to fit in memory + input_filename = self.get_input("mock_shear_catalog") + input_data = Table.read(input_filename, format="ascii") + n = len(input_data) + + data = {} + # required columns + for col in ["ra", "dec", "g1", "g2", "s2n", "T"]: + data[col] = input_data[col] + + # It's most likely we will have a redshift column. + # Check for both that and "redshift_true" + if "redshift" in input_data.colnames: + data["redshift_true"] = input_data["redshift"] + elif "redshift_true" in input_data.colnames: + data["redshift_true"] = input_data["redshift_true"] + + # If there is an ID column then use it, but otherwise just use + # sequential IDs + if "id" in input_data.colnames: + data["galaxy_id"] = input_data["id"] + else: + data["galaxy_id"] = np.arange(len(input_data)) + + # if these catalogs are not present then we fake them. + defaults = { + "T_err": 0.0, + "psf_g1": 0.0, + "psf_g2": 0.0, + "psf_T_mean": 0.202, # this corresponds to a FWHM of 0.75 arcsec + "weight": 1.0, + "flags": 0, + } + + for key, value in defaults.items(): + if key in input_data.colnames: + data[key] = input_data[key] + else: + data[key] = np.full(n, value) + + self.save_catalog(data) + + def save_catalog(self, data): + with self.open_output("shear_catalog") as f: + g = f.create_group("shear") + g.attrs["catalog_type"] = "simple" + for key, value in data.items(): + g.create_dataset(key, data=value) + + +class TXMockTruthPZ(PipelineStage): + name = "TXMockTruthPZ" + inputs = [("shear_catalog", ShearCatalog)] + outputs = [("photoz_pdfs", QPPDFFile)] + config_options = { + "mock_sigma_z": 0.001, + } + def run(self): + import qp + import numpy as np + sigma_z = self.config["mock_sigma_z"] + + + # read the input truth redshifts + with self.open_input("shear_catalog", wrapper=True) as f: + group = f.file[f.get_primary_catalog_group()] + n = group["ra"].size + redshifts = group["redshift_true"][:] + + zgrid = np.linspace(0, 3, 301) + pdfs = np.zeros((n, len(zgrid))) + + spread_z = sigma_z * (1 + redshifts) + # make a gaussian PDF for each object + delta = zgrid[np.newaxis, :] - redshifts[:, np.newaxis] + pdfs = np.exp(-0.5 * (delta / spread_z[:, np.newaxis])**2) / np.sqrt(2 * np.pi) / spread_z[:, np.newaxis] + + q = qp.Ensemble(qp.interp, data=dict(xvals=zgrid, yvals=pdfs)) + q.set_ancil(dict(zmode=redshifts, zmean=redshifts, zmedian=redshifts)) + q.write_to(self.get_output("photoz_pdfs")) + + + + def test(): import pylab @@ -1235,3 +1333,5 @@ def test(): results = make_mock_photometry(n_visit, bands, data, True) pylab.hist(results["snr_r"], bins=50, histtype="step") pylab.savefig("snr_r.png") + + diff --git a/txpipe/source_selector.py b/txpipe/source_selector.py index 10f483d9..48b5a708 100755 --- a/txpipe/source_selector.py +++ b/txpipe/source_selector.py @@ -17,12 +17,14 @@ LensfitCalculator, HSCCalculator, MetaDetectCalculator, + MockCalculator, ) from .utils.calibrators import ( MetaCalibrator, MetaDetectCalibrator, LensfitCalibrator, HSCCalibrator, + NullCalibrator, ) from .binning import build_tomographic_classifier, apply_classifier import numpy as np @@ -792,6 +794,53 @@ def compute_output_stats(self, calculator, mean, variance): return BinStats(N, Neff, mean_e, sigma_e, calibrator) +class TXSourceSelectorSimple(TXSourceSelectorBase): + """ + Source selection and tomography for mock catalogs that do not + require any calibration. + """ + name = "TXSourceSelectorSimple" + config_options = TXSourceSelectorBase.config_options.copy() + + def data_iterator(self): + chunk_rows = self.config["chunk_rows"] + bands = self.config["bands"] + + # Select columns we need. + shear_cols = [ + "psf_T_mean", + "weight", + "flags", + "T", + "s2n", + "g1", + "g2", + "weight", + ] + if self.config["input_pz"]: + shear_cols += ["mean_z"] + elif self.config["true_z"]: + shear_cols += ["redshift_true"] + else: + shear_cols += band_variants(bands, "mag", "mag_err", shear_catalog_type="hsc") + + # Iterate using parent class method + return self.iterate_hdf("shear_catalog", "shear", shear_cols, chunk_rows) + + def setup_response_calculators(self, nbin_source): + calculators = [MockCalculator(self.select) for i in range(nbin_source)] + calculators.append(MockCalculator(self.select_2d)) + return calculators + + def compute_output_stats(self, calculator, mean, variance): + # Collate calibration values + N, Neff = calculator.collect(self.comm, allgather=True) + calibrator = NullCalibrator(mean) + mean_e = mean.copy() + sigma_e = np.sqrt(0.5 * (variance[0] + variance[1])) + return BinStats(N, Neff, mean_e, sigma_e, calibrator) + + class TXSourceSelectorHSC(TXSourceSelectorBase): """ diff --git a/txpipe/utils/calibration_tools.py b/txpipe/utils/calibration_tools.py index 5d0f8fff..ea4c288d 100755 --- a/txpipe/utils/calibration_tools.py +++ b/txpipe/utils/calibration_tools.py @@ -13,7 +13,6 @@ def read_shear_catalog_type(stage): with stage.open_input("shear_catalog", wrapper=True) as f: shear_catalog_type = f.catalog_type stage.config["shear_catalog_type"] = shear_catalog_type - print('Temporarily setting cat_type to: ', f.catalog_type) return shear_catalog_type @@ -744,6 +743,107 @@ def collect(self, comm=None, allgather=False): return R, K, count, Neff +class MockCalculator: + """ + This class calculates tomographic statistics for mock catalogs + where no calibration is necessary. + + It only accumulates the statistics of the selected object weights + instead of any calibration quantities like its sibling classes. + + """ + + def __init__(self, selector): + """ + Initialize the Calibrator using the function you will use to select + objects. That function should take at least one argument, + the chunk of data to select on. + + The selector can take further *args and **kwargs, passed in when adding + data. + + Parameters + ---------- + selector: function + Function that selects objects + """ + self.selector = selector + self.count = 0 + self.sum_weights = 0 + self.sum_weights_sq = 0 + + + def add_data(self, data, *args, **kwargs): + """Select objects from a new chunk of data and tally their responses + + Parameters + ---------- + data: dict + Dictionary of data columns to select on and add + + *args + Positional arguments to be passed to the selection function + **kwargs + Keyword arguments to be passed to the selection function + + """ + data = _DataWrapper(data, "") + sel = self.selector(data, *args, **kwargs) + + # Extract the calibration quantities for the selected objects + w = data["weight"] + n = w[sel].size + self.count += n + w = w[sel] + self.sum_weights += np.sum(w) + self.sum_weights_sq += np.sum(w) + + return sel + + def collect(self, comm=None, allgather=False): + """ + Finalize and sum up all the response values, returning calibration + quantities. + + Parameters + ---------- + comm: MPI Communicator + If supplied, all processors response values will be combined together. + All processes will return the same final value + + Returns + ------- + N: int + Total object count + + Neff: float + Total effective number of galaxies + + + """ + # The total number of objects is just the + # number from all the processes summed together. + if comm is not None: + if allgather: + count = comm.allreduce(self.count) + sum_weights = comm.allreduce(self.sum_weights) + sum_weights_sq = comm.allreduce(self.sum_weights_sq) + + else: + count = comm.reduce(self.count) + sum_weights = comm.reduce(self.sum_weights) + sum_weights_sq = comm.reduce(self.sum_weights_sq) + else: + count = self.count + sum_weights = self.sum_weights + sum_weights_sq = self.sum_weights_sq + # Collect the weighted means of these numbers. + # this collects all the values from the different + # processes and over all the chunks of data + mode = "allgather" if allgather else "gather" + return count, sum_weights**2/sum_weights_sq + + class MeanShearInBins: def __init__( self, diff --git a/txpipe/utils/calibrators.py b/txpipe/utils/calibrators.py index 590eddf4..e2f448c9 100644 --- a/txpipe/utils/calibrators.py +++ b/txpipe/utils/calibrators.py @@ -48,7 +48,7 @@ def load(cls, tomo_file, null=False): cat_type = f["tomography"].attrs["catalog_type"] # choose a subclass based on this - if null: + if null or cat_type == "simple": subcls = NullCalibrator elif cat_type == "metacal": subcls = MetaCalibrator