Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ch/psestimate memory #96

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 100 additions & 1 deletion draco/analysis/powerspectrum.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@


import numpy as np
import time
from mpi4py import MPI

from caput import config
from caput import config, mpiarray, mpiutil
from ..core import task, containers


Expand Down Expand Up @@ -69,13 +71,110 @@ def process(self, klmodes):

q_list = []

st = time.time()

for mi, m in klmodes.vis[:].enumerate(axis=0):
ps_single = pse.q_estimator(m, klmodes.vis[m, : klmodes.nmode[m]])
q_list.append(ps_single)

q = klmodes.comm.allgather(np.array(q_list).sum(axis=0))
q = np.array(q).sum(axis=0)

et = time.time()
if mpiutil.rank0:
print("m, time needed for quadratic pse", m, et - st)
Comment on lines +77 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like left over debug statements. Either remove it, or at least improve the formatting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to keep, but agree about improving formatting

Comment on lines +76 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
et = time.time()
if mpiutil.rank0:
print("m, time needed for quadratic pse", m, et - st)
et = time.time()
self.log.debug(f"{m=}, time needed for quadratic pse {et - st}")


# reading from directory
fisher, bias = pse.fisher_bias()

ps = containers.Powerspectrum2D(
kperp_edges=pse.kperp_bands, kpar_edges=pse.kpar_bands
)

npar = len(ps.index_map["kpar"])
nperp = len(ps.index_map["kperp"])

# Calculate the right unmixing matrix for each ps type
if self.pstype == "unwindowed":
M = la.pinv(fisher, rcond=1e-8)
elif self.pstype == "uncorrelated":
Fh = la.cholesky(fisher)
M = la.inv(Fh) / Fh.sum(axis=1)[:, np.newaxis]
elif self.pstype == "minimum_variance":
M = np.diag(fisher.sum(axis=1) ** -1)

ps.powerspectrum[:] = np.dot(M, q - bias).reshape(nperp, npar)
ps.C_inv[:] = fisher.reshape(nperp, npar, nperp, npar)

return ps


class QuadraticPSEstimationXLarge(QuadraticPSEstimation):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be a separate class at all? Is there any reason this can't just replace the existing implementation? Or if not, merge them and change the behaviour to a config option, or even better, something that gets automatically determined based on the input data size.

Copy link
Contributor Author

@cahofer cahofer Feb 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, I could put them into one class and add a config option, but the two classes are fundamentally very different for the following reasons:

  • the QuadraticPSEstimationLarge uses from driftscan the PSMontelCarloLarge class in which the clarray and the qestimator are MPIarrays parallelized over power spectrum bands. This is very different from the original code, in which clarray is gathered on all ranks and qestimators are calculated locally.
  • This class respects the fact that clarray and qestimator are parallelized over bands, therefore the sky data need to be pre-computed for all m first, as each rank needs all m's. This is also very different from the original code, where the sky data were handled parallel in m.

If we were to use a config option - then we would have one class but and if/else statement in the process method handling the two different calculations.

I personally thought a separate class is cleaner, but after all - you are the boss ;)

"""Quadratic PS estimation for large arrays"""

def process(self, klmodes):
"""Estimate the power spectrum from the given data.

Parameters
----------
klmodes : containers.KLModes

Returns
-------
ps : containers.PowerSpectrum
"""

import scipy.linalg as la

if not isinstance(klmodes, containers.KLModes):
raise ValueError(
"Input container must be instance of "
"KLModes (received %s)" % klmodes.__class__
)

klmodes.redistribute("m")

pse = self.manager.psestimators[self.psname]
pse.genbands()

tel = self.manager.telescope

self.sky_array = mpiarray.MPIArray(
(tel.mmax + 1, tel.nfreq, tel.lmax + 1),
axis=0,
comm=MPI.COMM_WORLD,
dtype=np.complex128,
)

self.sky_array[:] = 0.0

for mi, m in klmodes.vis[:].enumerate(axis=0):
sky_vec1, sky_vec2 = pse.project_vector_kl_to_sky(
m, klmodes.vis[m, : klmodes.nmode[m]]
)
if (sky_vec1.shape[0], sky_vec2.shape[0]) != (0, 0):
self.sky_array[mi] = sky_vec1

self.sky_array = self.sky_array.allgather()

pse.qa = mpiarray.MPIArray(
(tel.mmax + 1, pse.nbands, 1), axis=1, comm=MPI.COMM_WORLD, dtype=np.float64
)

pse.qa[:] = 0.0

for m in range(tel.mmax + 1):
if np.sum(self.sky_array[m]) != 0:
sky_m = self.sky_array[m].reshape((tel.nfreq, tel.lmax + 1, 1))
pse.q_estimator(m, sky_m, sky_m)

# Redistribute over m's and have bands not distributed
pse.qa = pse.qa.redistribute(axis=0)
# Sum over local m's for all bands
q_local = np.sum(np.array(pse.qa[:]), axis=0)
# Collect all m on each rank
q = mpiutil.allreduce(q_local[:, 0], op=MPI.SUM)

# reading from directory
fisher, bias = pse.fisher_bias()

Expand Down