From dda6f8d3ad625a891cbd583b184e471c394ec01e Mon Sep 17 00:00:00 2001 From: Carolin Hofer Date: Mon, 20 Apr 2020 20:34:29 -0700 Subject: [PATCH 1/3] feat(QuadraticPSEstimationXLarge): distribution over bands --- draco/analysis/powerspectrum.py | 107 +++++++++++++++++++++++++++++++- 1 file changed, 106 insertions(+), 1 deletion(-) diff --git a/draco/analysis/powerspectrum.py b/draco/analysis/powerspectrum.py index 113d9b062..f4a64b367 100644 --- a/draco/analysis/powerspectrum.py +++ b/draco/analysis/powerspectrum.py @@ -9,8 +9,10 @@ import numpy as np +import time +from mpi4py import MPI -from caput import config +from caput import config, mpiarray, mpiutil from ..core import task, containers @@ -69,6 +71,8 @@ def process(self, klmodes): q_list = [] + st = time.time() + for mi, m in klmodes.vis[:].enumerate(axis=0): ps_single = pse.q_estimator(m, klmodes.vis[m, : klmodes.nmode[m]]) q_list.append(ps_single) @@ -76,6 +80,107 @@ def process(self, klmodes): q = klmodes.comm.allgather(np.array(q_list).sum(axis=0)) q = np.array(q).sum(axis=0) + et = time.time() + if mpiutil.rank0: + print("m, time needed for quadratic pse", m, et - st) + + # reading from directory + fisher, bias = pse.fisher_bias() + + ps = containers.Powerspectrum2D( + kperp_edges=pse.kperp_bands, kpar_edges=pse.kpar_bands + ) + + npar = len(ps.index_map["kpar"]) + nperp = len(ps.index_map["kperp"]) + + # Calculate the right unmixing matrix for each ps type + if self.pstype == "unwindowed": + M = la.pinv(fisher, rcond=1e-8) + elif self.pstype == "uncorrelated": + Fh = la.cholesky(fisher) + M = la.inv(Fh) / Fh.sum(axis=1)[:, np.newaxis] + elif self.pstype == "minimum_variance": + M = np.diag(fisher.sum(axis=1) ** -1) + + ps.powerspectrum[:] = np.dot(M, q - bias).reshape(nperp, npar) + ps.C_inv[:] = fisher.reshape(nperp, npar, nperp, npar) + + return ps + + +class QuadraticPSEstimationXLarge(QuadraticPSEstimation): + """Quadratic PS estimation for large arrays""" + + def process(self, klmodes): + """Estimate the power spectrum from the given data. + + Parameters + ---------- + klmodes : containers.KLModes + + Returns + ------- + ps : containers.PowerSpectrum + """ + + import scipy.linalg as la + + if not isinstance(klmodes, containers.KLModes): + raise ValueError( + "Input container must be instance of " + "KLModes (received %s)" % klmodes.__class__ + ) + + klmodes.redistribute("m") + + pse = self.manager.psestimators[self.psname] + pse.genbands() + + tel = self.manager.telescope + + self.sky_array = mpiarray.MPIArray( + (tel.mmax + 1, tel.nfreq, tel.lmax + 1), + axis=0, + comm=MPI.COMM_WORLD, + dtype=np.complex128, + ) + + self.sky_array[:] = 0.0 + + st = time.time() + + for mi, m in klmodes.vis[:].enumerate(axis=0): + sky_vec1, sky_vec2 = pse.project_vector_kl_to_sky( + m, klmodes.vis[m, : klmodes.nmode[m]] + ) + if (sky_vec1.shape[0], sky_vec2.shape[0]) != (0, 0): + self.sky_array[mi] = sky_vec1 + + self.sky_array = self.sky_array.allgather() + + pse.qa = mpiarray.MPIArray( + (tel.mmax + 1, pse.nbands, 1), axis=1, comm=MPI.COMM_WORLD, dtype=np.float64 + ) + + pse.qa[:] = 0.0 + + for m in range(tel.mmax + 1): + if np.sum(self.sky_array[m]) != 0: + sky_m = self.sky_array[m].reshape((tel.nfreq, tel.lmax + 1, 1)) + pse.q_estimator(m, sky_m, sky_m) + + # Redistribute over m's and have bands not distributed + pse.qa = pse.qa.redistribute(axis=0) + # Sum over local m's for all bands + q_local = np.sum(np.array(pse.qa[:]), axis=0) + # Collect all m on each rank + q = mpiutil.allreduce(q_local[:, 0], op=MPI.SUM) + + et = time.time() + if mpiutil.rank0: + print("m, time needed for quadratic pse", m, et - st) + # reading from directory fisher, bias = pse.fisher_bias() From 2166a897a19e37a80eb326719d1f1f41c4e7c8cd Mon Sep 17 00:00:00 2001 From: Carolin Hofer Date: Wed, 6 May 2020 11:47:18 -0700 Subject: [PATCH 2/3] fix(powerspectrum): clean up print statements --- draco/analysis/powerspectrum.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/draco/analysis/powerspectrum.py b/draco/analysis/powerspectrum.py index f4a64b367..bfb887bb4 100644 --- a/draco/analysis/powerspectrum.py +++ b/draco/analysis/powerspectrum.py @@ -148,8 +148,6 @@ def process(self, klmodes): self.sky_array[:] = 0.0 - st = time.time() - for mi, m in klmodes.vis[:].enumerate(axis=0): sky_vec1, sky_vec2 = pse.project_vector_kl_to_sky( m, klmodes.vis[m, : klmodes.nmode[m]] @@ -177,10 +175,6 @@ def process(self, klmodes): # Collect all m on each rank q = mpiutil.allreduce(q_local[:, 0], op=MPI.SUM) - et = time.time() - if mpiutil.rank0: - print("m, time needed for quadratic pse", m, et - st) - # reading from directory fisher, bias = pse.fisher_bias() From cfca22046d06d798626e4b01ccf444dff8849222 Mon Sep 17 00:00:00 2001 From: Carolin Hofer Date: Thu, 4 Feb 2021 20:59:28 -0800 Subject: [PATCH 3/3] fix(powerspectrum): code clean up --- draco/analysis/powerspectrum.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/draco/analysis/powerspectrum.py b/draco/analysis/powerspectrum.py index bfb887bb4..be058a693 100644 --- a/draco/analysis/powerspectrum.py +++ b/draco/analysis/powerspectrum.py @@ -109,7 +109,7 @@ def process(self, klmodes): return ps -class QuadraticPSEstimationXLarge(QuadraticPSEstimation): +class QuadraticPSEstimationLarge(QuadraticPSEstimation): """Quadratic PS estimation for large arrays""" def process(self, klmodes): @@ -149,10 +149,11 @@ def process(self, klmodes): self.sky_array[:] = 0.0 for mi, m in klmodes.vis[:].enumerate(axis=0): - sky_vec1, sky_vec2 = pse.project_vector_kl_to_sky( - m, klmodes.vis[m, : klmodes.nmode[m]] + sky_vec1 = pse.project_vector_kl_to_sky( + m, + klmodes.vis[m, :klmodes.nmode[m]] ) - if (sky_vec1.shape[0], sky_vec2.shape[0]) != (0, 0): + if sky_vec1.shape[0] != 0: self.sky_array[mi] = sky_vec1 self.sky_array = self.sky_array.allgather() @@ -166,7 +167,7 @@ def process(self, klmodes): for m in range(tel.mmax + 1): if np.sum(self.sky_array[m]) != 0: sky_m = self.sky_array[m].reshape((tel.nfreq, tel.lmax + 1, 1)) - pse.q_estimator(m, sky_m, sky_m) + pse.q_estimator(m, sky_m) # Redistribute over m's and have bands not distributed pse.qa = pse.qa.redistribute(axis=0)