diff --git a/setup.cfg b/setup.cfg
index fa66459f..8bd80d5a 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -27,7 +27,7 @@ install_requires =
mflike @ git+https://github.com/simonsobs/lat_mflike@master
[options.package_data]
-soliket = *.yaml,*.bibtex,clusters/data/*,clusters/data/selFn_equD56/*,lensing/data/*.txt,mflike/*.yaml,tests/*.yaml,data/xcorr_simulated/*.txt
+soliket = *.yaml,*.bibtex,clusters/data/*,clusters/data/selFn_equD56/*,lensing/data/*.txt,mflike/*.yaml,tests/*.yaml,data/xcorr_simulated/*.txt,data/CosmoPower/CP_paper/CMB/*.pkl
testpaths = "soliket"
text_file_format = rst
diff --git a/soliket/__init__.py b/soliket/__init__.py
index 204d928c..ead5c120 100644
--- a/soliket/__init__.py
+++ b/soliket/__init__.py
@@ -7,7 +7,7 @@
from .xcorr import XcorrLikelihood # noqa: F401
from .foreground import Foreground
from .bandpass import BandPass
-from .cosmopower import CosmoPower
+from .cosmopower import CosmoPower, CosmoPowerDerived
try:
from .clusters import ClusterLikelihood # noqa: F401
diff --git a/soliket/cosmopower.py b/soliket/cosmopower.py
index 29353360..349dbe56 100755
--- a/soliket/cosmopower.py
+++ b/soliket/cosmopower.py
@@ -1,3 +1,30 @@
+"""
+.. module:: soliket.cosmopower
+
+:Synopsis: Simple CosmoPower theory wrapper for Cobaya.
+:Author: Hidde T. Jense
+
+.. |br| raw:: html
+
+
+
+.. note::
+
+ **If you use this cosmological code, please cite:**
+ |br|
+ A. Spurio Mancini et al.
+ *CosmoPower: emulating cosmological power spectra for accelerated Bayesian
+ inference from next-generation surveys*
+ (`arXiv:210603846 `_)
+
+ And remember to cite any sources for trained networks you use.
+
+Usage
+-----
+
+After installing SOLikeT and cosmopower, you can use the ``CosmoPower`` theory codes
+by adding the ``soliket.CosmoPower`` code as a block in your parameter files.
+"""
import os
try:
import cosmopower as cp # noqa F401
@@ -7,100 +34,300 @@
HAS_COSMOPOWER = True
import numpy as np
+from typing import Dict, Iterable, Tuple
+
+from cobaya.log import LoggedError
+from cobaya.theory import Theory
from cobaya.theories.cosmo import BoltzmannBase
from cobaya.typing import InfoDict
-"""
- Simple CosmoPower theory wrapper for Cobaya.
- author: Hidde T. Jense
-"""
-
class CosmoPower(BoltzmannBase):
- soliket_data_path: str = "soliket/data/CosmoPower"
- network_path: str = "CP_paper/CMB"
- cmb_tt_nn_filename: str = "cmb_TT_NN"
- cmb_te_pcaplusnn_filename: str = "cmb_TE_PCAplusNN"
- cmb_ee_nn_filename: str = "cmb_EE_NN"
-
- extra_args: InfoDict = {}
-
- renames: dict = {
- "omega_b": ["ombh2", "omegabh2"],
- "omega_cdm": ["omch2", "omegach2"],
- "ln10^{10}A_s": ["logA"],
- "n_s": ["ns"],
- "h": [],
- "tau_reio": ["tau"],
- }
-
- def initialize(self):
+ """A CosmoPower Network wrapper for Cobaya."""
+
+ stop_at_error: bool = False
+
+ network_path: str = "soliket/data/CosmoPower/CP_paper/CMB"
+ network_settings: InfoDict = None
+
+ extra_args: InfoDict = None
+
+ def initialize(self) -> None:
super().initialize()
- base_path = os.path.join(self.soliket_data_path, self.network_path)
-
- self.cp_tt_nn = cp.cosmopower_NN(
- restore=True,
- restore_filename=os.path.join(base_path, self.cmb_tt_nn_filename),
- )
- self.cp_te_nn = cp.cosmopower_PCAplusNN(
- restore=True,
- restore_filename=os.path.join(base_path, self.cmb_te_pcaplusnn_filename),
- )
- self.cp_ee_nn = cp.cosmopower_NN(
- restore=True,
- restore_filename=os.path.join(base_path, self.cmb_ee_nn_filename),
- )
+ if self.network_settings is None:
+ raise LoggedError("No network settings were provided.")
+
+ self.networks = {}
+ self.all_parameters = set([])
+
+ for spectype in self.network_settings:
+ netdata = {}
+ nettype = self.network_settings[spectype]
+ netpath = os.path.join(self.network_path, nettype["filename"])
+
+ if nettype["type"] == "NN":
+ network = cp.cosmopower_NN(
+ restore=True, restore_filename=netpath)
+ elif nettype["type"] == "PCAplusNN":
+ network = cp.cosmopower_PCAplusNN(
+ restore=True, restore_filename=netpath)
+ elif self.stop_at_error:
+ raise ValueError(
+ f"Unknown network type {nettype['type']} for network {spectype}.")
+ else:
+ self.log.warn(
+ f"Unknown network type {nettype['type']}\
+ for network {spectype}: skipped!")
+
+ netdata["type"] = nettype["type"]
+ netdata["log"] = nettype.get("log", True)
+ netdata["network"] = network
+ netdata["parameters"] = list(network.parameters)
+ netdata["lmax"] = network.modes.max()
+ netdata["has_ell_factor"] = nettype.get("has_ell_factor", False)
+
+ self.all_parameters = self.all_parameters | set(network.parameters)
+
+ if network is not None:
+ self.networks[spectype.lower()] = netdata
if "lmax" not in self.extra_args:
self.extra_args["lmax"] = None
self.log.info(f"Loaded CosmoPower from directory {self.network_path}")
+ self.log.info(
+ f"CosmoPower will expect the parameters {self.all_parameters}")
- def calculate(self, state, want_derived=True, **params):
- cmb_params = {}
+ def calculate(self, state: dict, want_derived: bool = True, **params) -> bool:
+ ## sadly, this syntax not valid until python 3.9
+ # cmb_params = {
+ # p: [params[p]] for p in params
+ # } | {
+ # self.translate_param(p): [params[p]] for p in params
+ # }
+ cmb_params = {**{
+ p: [params[p]] for p in params
+ }, **{
+ self.translate_param(p): [params[p]] for p in params
+ }}
- for par in self.renames:
- if par in params:
- cmb_params[par] = [params[par]]
+ ells = None
+
+ for spectype in self.networks:
+ network = self.networks[spectype]
+ used_params = {par: (cmb_params[par] if par in cmb_params else [
+ params[par]]) for par in network["parameters"]}
+
+ if network["log"]:
+ data = network["network"].ten_to_predictions_np(used_params)[
+ 0, :]
else:
- for r in self.renames[par]:
- if r in params:
- cmb_params[par] = [params[r]]
- break
+ data = network["network"].predictions_np(used_params)[0, :]
+
+ state[spectype] = data
+
+ if ells is None:
+ ells = network["network"].modes
- state["tt"] = self.cp_tt_nn.ten_to_predictions_np(cmb_params)[0, :]
- state["te"] = self.cp_te_nn.predictions_np(cmb_params)[0, :]
- state["ee"] = self.cp_ee_nn.ten_to_predictions_np(cmb_params)[0, :]
- state["ell"] = self.cp_tt_nn.modes
+ state["ell"] = ells.astype(int)
- def get_Cl(self, ell_factor=False, units="FIRASmuK2"):
+ return True
+
+ def get_Cl(self, ell_factor: bool = False, units: str = "FIRASmuK2") -> dict:
cls_old = self.current_state.copy()
- lmax = self.extra_args["lmax"] or (cls_old["tt"].shape[0] + 2)
+ lmax = self.extra_args["lmax"] or cls_old["ell"].max()
- cls = {"ell": np.arange(lmax).astype(int)}
+ cls = {"ell": np.arange(lmax + 1).astype(int)}
ls = cls_old["ell"]
- for k in ["tt", "te", "ee"]:
- # cls[k] = np.zeros(cls["ell"].shape, dtype=float)
- cls[k] = np.empty(cls["ell"].shape, dtype=float)
- cls[k][:] = np.nan
+ for k in self.networks:
+ cls[k] = np.tile(np.nan, cls["ell"].shape)
- cmb_fac = self._cmb_unit_factor(units, 2.7255)
+ for k in self.networks:
+ prefac = np.ones_like(ls).astype(float)
- if ell_factor:
- ls_fac = ls * (ls + 1.0) / (2.0 * np.pi)
- else:
- ls_fac = 1.0
+ if self.networks[k]["has_ell_factor"]:
+ prefac /= self.ell_factor(ls, k)
+ if ell_factor:
+ prefac *= self.ell_factor(ls, k)
- for k in ["tt", "te", "ee"]:
- cls[k][ls] = cls_old[k] * ls_fac * cmb_fac ** 2.0
+ cls[k][ls] = cls_old[k] * prefac * \
+ self.cmb_unit_factor(k, units, 2.7255)
+ cls[k][:2] = 0.0
if np.any(np.isnan(cls[k])):
- self.log.warning("CosmoPower used outside of trained "\
+ self.log.warning("CosmoPower used outside of trained "
"{} ell range. Filled in with NaNs.".format(k))
return cls
- def get_can_support_params(self):
- return ["omega_b", "omega_cdm", "h", "logA", "ns", "tau_reio"]
+ def ell_factor(self, ls: np.ndarray, spectra: str) -> np.ndarray:
+ """
+ Calculate the ell factor for a specific spectrum.
+ These prefactors are used to convert from Cell to Dell and vice-versa.
+
+ See also:
+ cobaya.BoltzmannBase.get_Cl
+ `camb.CAMBresults.get_cmb_power_spectra `_ # noqa E501
+
+ Example:
+ ell_factor(l, "tt") -> l(l+1)/(2 pi).
+ ell_factor(l, "pp") -> l^2(l+1)^2/(2 pi).
+
+ :param ls: the range of ells.
+ :param spectra: a two-character string with each character being one of [tebp].
+
+ :return: an array filled with ell factors for the given spectrum.
+ """
+ ellfac = np.ones_like(ls).astype(float)
+
+ if spectra in ["tt", "te", "tb", "ee", "et", "eb", "bb", "bt", "be"]:
+ ellfac = ls * (ls + 1.0) / (2.0 * np.pi)
+ elif spectra in ["pt", "pe", "pb", "tp", "ep", "bp"]:
+ ellfac = (ls * (ls + 1.0)) ** (3. / 2.) / (2.0 * np.pi)
+ elif spectra in ["pp"]:
+ ellfac = (ls * (ls + 1.0)) ** 2.0 / (2.0 * np.pi)
+
+ return ellfac
+
+ def cmb_unit_factor(self, spectra: str,
+ units: str = "FIRASmuK2",
+ Tcmb: float = 2.7255) -> float:
+ """
+ Calculate the CMB prefactor for going from dimensionless power spectra to
+ CMB units.
+
+ :param spectra: a length 2 string specifying the spectrum for which to
+ calculate the units.
+ :param units: a string specifying which units to use.
+ :param Tcmb: the used CMB temperature [units of K].
+ :return: The CMB unit conversion factor.
+ """
+ res = 1.0
+ x, y = spectra.lower()
+
+ if x == "t" or x == "e" or x == "b":
+ res *= self._cmb_unit_factor(units, Tcmb)
+ elif x == "p":
+ res *= 1. / np.sqrt(2.0 * np.pi)
+
+ if y == "t" or y == "e" or y == "b":
+ res *= self._cmb_unit_factor(units, Tcmb)
+ elif y == "p":
+ res *= 1. / np.sqrt(2.0 * np.pi)
+
+ return res
+
+ def get_can_support_parameters(self) -> Iterable[str]:
+ return self.all_parameters
+
+ def get_requirements(self) -> Iterable[Tuple[str, str]]:
+ requirements = []
+ for k in self.all_parameters:
+ if k in self.renames.values():
+ for v in self.renames:
+ if self.renames[v] == k:
+ requirements.append((v, None))
+ break
+ else:
+ requirements.append((k, None))
+
+ return requirements
+
+
+class CosmoPowerDerived(Theory):
+ """A theory class that can calculate derived parameters from CosmoPower networks."""
+ stop_at_error: bool = False
+
+ network_path: str = "soliket/data/CosmoPower/CP_paper/CMB"
+ network_settings: InfoDict = None
+
+ derived_parameters: Iterable[str]
+ derived_values: Dict
+
+ extra_args: InfoDict = None
+
+ renames: Dict = {}
+
+ def initialize(self) -> None:
+ super().initialize()
+
+ if self.network_settings is None:
+ raise LoggedError("No network settings were provided.")
+
+ netpath = os.path.join(self.network_path, self.network_settings["filename"])
+
+ if self.network_settings["type"] == "NN":
+ self.network = cp.cosmopower_NN(
+ restore=True, restore_filename=netpath)
+ elif self.network_settings["type"] == "PCAplusNN":
+ self.network = cp.cosmopower_PCAplusNN(
+ restore=True, restore_filename=netpath)
+ else:
+ raise LoggedError(
+ f"Unknown network type {self.network_settings['type']}.")
+
+ self.input_parameters = set(self.network.parameters)
+
+ self.log_data = self.network_settings.get("log", False)
+
+ self.log.info(
+ f"Loaded CosmoPowerDerived from directory {self.network_path}")
+ self.log.info(
+ f"CosmoPowerDerived will expect the parameters {self.input_parameters}")
+ self.log.info(
+ f"CosmoPowerDerived can provide the following parameters: \
+ {self.get_can_provide()}.")
+
+ def translate_param(self, p):
+ return self.renames.get(p, p)
+
+ def calculate(self, state: dict, want_derived: bool = True, **params) -> bool:
+ ## sadly, this syntax not valid until python 3.9
+ # input_params = {
+ # p: [params[p]] for p in params
+ # } | {
+ # self.translate_param(p): [params[p]] for p in params
+ # }
+ input_params = {**{
+ p: [params[p]] for p in params
+ }, **{
+ self.translate_param(p): [params[p]] for p in params
+ }}
+
+ if self.log_data:
+ data = self.network.ten_to_predictions_np(input_params)[0, :]
+ else:
+ data = self.network.predictions_np(input_params)[0, :]
+
+ for k, v in zip(self.derived_parameters, data):
+ if len(k) == 0 or k == "_":
+ continue
+
+ state["derived"][k] = v
+
+ return True
+
+ def get_param(self, p) -> float:
+ return self.current_state["derived"][self.translate_param(p)]
+
+ def get_can_support_parameters(self) -> Iterable[str]:
+ return self.input_parameters
+
+ def get_requirements(self) -> Iterable[Tuple[str, str]]:
+ requirements = []
+ for k in self.input_parameters:
+ if k in self.renames.values():
+ for v in self.renames:
+ if self.renames[v] == k:
+ requirements.append((v, None))
+ break
+ else:
+ requirements.append((k, None))
+
+ return requirements
+
+ def get_can_provide(self) -> Iterable[str]:
+ return set([par for par in self.derived_parameters
+ if (len(par) > 0 and not par == "_")])
diff --git a/soliket/tests/test_cosmopower.py b/soliket/tests/test_cosmopower.py
index 1ef05efa..55e90a6d 100644
--- a/soliket/tests/test_cosmopower.py
+++ b/soliket/tests/test_cosmopower.py
@@ -30,11 +30,37 @@
},
"theory": {
"soliket.CosmoPower": {
- # "soliket_data_path": os.path.normpath(
- # os.path.join(os.getcwd(), "../data/CosmoPower")
- # ),
- "soliket_data_path": "soliket/data/CosmoPower",
"stop_at_error": True,
+ "network_settings": {
+ "tt": {
+ "type": "NN",
+ "log": True,
+ "filename": "cmb_TT_NN",
+ # If your network has been trained on (l (l+1) / 2 pi) C_l,
+ # this flag needs to be set.
+ "has_ell_factor": False,
+ },
+ "ee": {
+ "type": "NN",
+ "log": True,
+ "filename": "cmb_EE_NN",
+ "has_ell_factor": False,
+ },
+ "te": {
+ "type": "PCAplusNN",
+ # Trained on Cl, not log(Cl)
+ "log": False,
+ "filename": "cmb_TE_PCAplusNN",
+ "has_ell_factor": False,
+ },
+ },
+ "renames": {
+ "ombh2": "omega_b",
+ "omch2": "omega_cdm",
+ "ns": "n_s",
+ "logA": "ln10^{10}A_s",
+ "tau": "tau_reio"
+ }
}
},
}
@@ -42,15 +68,16 @@
@pytest.mark.skipif(not HAS_COSMOPOWER, reason='test requires cosmopower')
def test_cosmopower_theory(request):
- info_dict['theory']['soliket.CosmoPower']['soliket_data_path'] = \
- os.path.join(request.config.rootdir, 'soliket/data/CosmoPower')
+ info_dict['theory']['soliket.CosmoPower']['network_path'] = \
+ os.path.join(request.config.rootdir, 'soliket/data/CosmoPower/CP_paper/CMB')
+
model_fiducial = get_model(info_dict) # noqa F841
@pytest.mark.skipif(not HAS_COSMOPOWER, reason='test requires cosmopower')
def test_cosmopower_loglike(request):
- info_dict['theory']['soliket.CosmoPower']['soliket_data_path'] = \
- os.path.join(request.config.rootdir, 'soliket/data/CosmoPower')
+ info_dict['theory']['soliket.CosmoPower']['network_path'] = \
+ os.path.join(request.config.rootdir, 'soliket/data/CosmoPower/CP_paper/CMB')
model_cp = get_model(info_dict)
logL_cp = float(model_cp.loglikes({})[0])
@@ -67,12 +94,37 @@ def test_cosmopower_against_camb(request):
camb_cls = model_camb.theory['camb'].get_Cl()
info_dict['theory'] = {
- "soliket.CosmoPower": {
- "soliket_data_path": os.path.join(request.config.rootdir,
- 'soliket/data/CosmoPower'),
- "stop_at_error": True,
- "extra_args": {'lmax': camb_cls['ell'].max() + 1}}
+ "soliket.CosmoPower": {
+ "stop_at_error": True,
+ "extra_args": {'lmax': camb_cls['ell'].max()},
+ 'network_path': os.path.join(request.config.rootdir,
+ 'soliket/data/CosmoPower/CP_paper/CMB'),
+ "network_settings": {
+ "tt": {
+ "type": "NN",
+ "log": True,
+ "filename": "cmb_TT_NN"
+ },
+ "ee": {
+ "type": "NN",
+ "log": True,
+ "filename": "cmb_EE_NN"
+ },
+ "te": {
+ "type": "PCAplusNN",
+ "log": False,
+ "filename": "cmb_TE_PCAplusNN"
+ },
+ },
+ "renames": {
+ "ombh2": "omega_b",
+ "omch2": "omega_cdm",
+ "ns": "n_s",
+ "logA": "ln10^{10}A_s",
+ "tau": "tau_reio"
+ }
}
+ }
model_cp = get_model(info_dict)
logL_cp = float(model_cp.loglikes({})[0])