diff --git a/src/multi_condition_comparisions/__init__.py b/src/multi_condition_comparisions/__init__.py index 35fd6a6..275ac32 100644 --- a/src/multi_condition_comparisions/__init__.py +++ b/src/multi_condition_comparisions/__init__.py @@ -1,7 +1,7 @@ from importlib.metadata import version -from . import pl, pp, tl +from . import methods, pl -__all__ = ["pl", "pp", "tl"] +__all__ = ["pl", "methods"] __version__ = version("multi-condition-comparisions") diff --git a/src/multi_condition_comparisions/_util.py b/src/multi_condition_comparisions/_util.py new file mode 100644 index 0000000..2846296 --- /dev/null +++ b/src/multi_condition_comparisions/_util.py @@ -0,0 +1,27 @@ +import numpy as np +from scipy.sparse import issparse, spmatrix + + +def check_is_integer_matrix(array: np.ndarray | spmatrix, tolerance: float = 1e-6) -> None: + """Check if a matrix container integers, or floats that are close to integers. + + Parameters + ---------- + array + dense or sparse matrix to check + tolerance + Values must be this close to ingegers + + Raises + ------ + ValueError + if the matrix contains valuese that are not close to integers + """ + if issparse(array): + if not array.data.dtype.kind == "i" or not np.all(np.abs(array.data - np.round(array.data)) < tolerance): + raise ValueError("Non-zero elements of the matrix must be close to integer values.") + else: + if not array.dtype.kind == "i" or not np.all(np.abs(array - np.round(array)) < tolerance): + raise ValueError("Matrix must be a count matrix.") + if (array < 0).sum() > 0: + raise ValueError("Non.zero elements of the matrix must be postiive.") diff --git a/src/multi_condition_comparisions/methods/__init__.py b/src/multi_condition_comparisions/methods/__init__.py new file mode 100644 index 0000000..76fbb8c --- /dev/null +++ b/src/multi_condition_comparisions/methods/__init__.py @@ -0,0 +1,7 @@ +from ._base import ContrastType, LinearModelBase, MethodBase +from ._edger import EdgeR +from ._pydeseq2 import PyDESeq2 +from ._simple_tests import WilcoxonTest +from ._statsmodels import Statsmodels + +__all__ = ["MethodBase", "LinearModelBase", "EdgeR", "PyDESeq2", "Statsmodels", "WilcoxonTest", "ContrastType"] diff --git a/src/multi_condition_comparisions/methods/_base.py b/src/multi_condition_comparisions/methods/_base.py new file mode 100644 index 0000000..88c0598 --- /dev/null +++ b/src/multi_condition_comparisions/methods/_base.py @@ -0,0 +1,333 @@ +import re +from abc import ABC, abstractmethod +from collections.abc import Sequence +from dataclasses import dataclass + +import numpy as np +import pandas as pd +from anndata import AnnData +from formulaic import model_matrix +from formulaic.model_matrix import ModelMatrix + + +@dataclass +class Contrast: + """Simple contrast for comparison between groups""" + + column: str + baseline: str + group_to_compare: str + + +ContrastType = Contrast | tuple[str, str, str] + + +class MethodBase(ABC): + def __init__( + self, + adata: AnnData, + *, + mask: str | None = None, + layer: str | None = None, + **kwargs, + ): + """ + Initialize the method + + Parameters + ---------- + adata + AnnData object, usually pseudobulked. + design + Model design. Can be either a design matrix, a formulaic formula.Formulaic formula in the format 'x + z' or '~x+z'. + mask + A column in adata.var that contains a boolean mask with selected features. + layer + Layer to use in fit(). If None, use the X array. + **kwargs + Keyword arguments specific to the method implementation + """ + self.adata = adata + if mask is not None: + self.adata = self.adata[:, self.adata.var[mask]] + + self.layer = layer + + # Do some sanity checks on the input. Do them after the mask is applied. + # Check that counts have no NaN or Inf values. + if np.any(~np.isfinite(self.data)): + raise ValueError("Counts cannot contain negative, NaN or Inf values.") + # Check that counts have numeric values. + if not np.issubdtype(self.adata.X.dtype, np.number): + raise ValueError("Counts must be numeric.") + + @property + def data(self): + """Get the data matrix from anndata this object was initalized with (X or layer)""" + if self.layer is None: + return self.adata.X + else: + return self.adata.layer[self.layer] + + @classmethod + @abstractmethod + def compare_groups( + cls, + adata: AnnData, + column: str, + baseline: str, + groups_to_compare: str | Sequence[str], + *, + paired_by: str = None, + mask: str | None = None, + layer: str | None = None, + ) -> pd.DataFrame: + """ + Compare between groups in a specified column. + + This interface is expected to be provided by all methods. Methods can provide other interfaces + on top, see e.g. {class}`LinearModelBase`. This is a high-level interface that is kept simple on purpose and + only supports comparisons between groups on a single column at a time. For more complex designs, + please use the LinearModel method classes directly. + + Parameters + ---------- + adata + AnnData object + column + column in obs that contains the grouping information + baseline + baseline value (one category from variable). If set to "None" this refers to "all other categories". + groups_to_compare + One or multiple categories from variable to compare against baseline. Setting this to None refers to + "all categories" + paired_by + Column from `obs` that contains information about paired sample (e.g. subject_id) + mask + Subset anndata by a boolean mask stored in this column in `.obs` before making any tests + layer + Use this layer instead of `.X`. + + Returns + ------- + Pandas dataframe with results ordered by significance. If multiple comparisons were performed this + is indicated in an additional column. + """ + ... + + +class LinearModelBase(MethodBase): + """Base class for DE methods that have a linear model interface (i.e. support design matrices and contrast testing)""" + + def __init__( + self, + adata: AnnData, + design: str | np.ndarray, + *, + mask: str | None = None, + layer: str | None = None, + **kwargs, + ): + """ + Initialize the method + + Parameters + ---------- + adata + AnnData object, usually pseudobulked. + design + Model design. Can be either a design matrix, a formulaic formula.Formulaic formula in the format 'x + z' or '~x+z'. + mask + A column in adata.var that contains a boolean mask with selected features. + layer + Layer to use in fit(). If None, use the X array. + **kwargs + Keyword arguments specific to the method implementation + """ + super().__init__(adata, mask=mask, layer=layer) + self._check_counts() + + if isinstance(design, str): + self.design = model_matrix(design, adata.obs) + else: + self.design = design + + @classmethod + def compare_groups( + cls, + adata: AnnData, + column: str, + baseline: str, + groups_to_compare: str | Sequence[str], + *, + paired_by: str | None = None, + mask: str | None = None, + layer: str | None = None, + fit_kwargs: dict = None, + test_kwargs: dict = None, + ) -> pd.DataFrame: + if test_kwargs is None: + test_kwargs = {} + if fit_kwargs is None: + fit_kwargs = {} + design = f"~{column}" + if paired_by is not None: + design += f"+{paired_by}" + if isinstance(groups_to_compare, str): + groups_to_compare = [groups_to_compare] + model = cls(adata, design=design, mask=mask, layer=layer) + + ## Fit model + model.fit(**fit_kwargs) + + ## Test contrasts + de_res = model.test_contrasts( + { + group_to_compare: model.contrast(column=column, baseline=baseline, group_to_compare=group_to_compare) + for group_to_compare in groups_to_compare + }, + **test_kwargs, + ) + + return de_res + + @property + def variables(self): + """Get the names of the variables used in the model definition""" + try: + return self.design.model_spec.variables_by_source["data"] + except AttributeError: + raise ValueError( + "Retrieving variables is only possible if the model was initialized using a formula." + ) from None + + @abstractmethod + def _check_counts(self) -> None: + """ + Check that counts are valid for the specific method. + + Different methods may have different requirements. + + Raises + ------ + ValueError + if the data matrix does not comply with the expectations + """ + ... + + @abstractmethod + def fit(self, **kwargs) -> None: + """ + Fit the model + + Parameters + ---------- + **kwargs + Additional arguments for fitting the specific method. + """ + ... + + @abstractmethod + def _test_single_contrast(self, contrast, **kwargs) -> pd.DataFrame: + ... + + def test_contrasts( + self, contrasts: list[str] | dict[str, np.ndarray] | dict[str, list] | np.ndarray, **kwargs + ) -> pd.DataFrame: + """ + Conduct a specific test. Please use :method:`contrast` to build the contrasts instead of building it on your own. + + Parameters + ---------- + contrasts: + either a single contrast, or a dictionary of contrasts where the key is the name for that particular contrast. + Each contrast can be either a vector of coefficients (the most general case), a string, or a some fancy DSL + (details still need to be figured out). + + or a tuple with three elements contrasts = ("condition", "control", "treatment") + """ + if not isinstance(contrasts, dict): + contrasts = {None: contrasts} + results = [] + for name, contrast in contrasts.items(): + results.append(self._test_single_contrast(contrast, **kwargs).assign(contrast=name)) + + results_df = pd.concat(results) + results_df.rename( + columns={"pvalue": "pvals", "padj": "pvals_adj", "log2FoldChange": "logfoldchanges"}, inplace=True + ) + + return results_df + + def test_reduced(self, modelB: "MethodBase") -> pd.DataFrame: + """ + Test against a reduced model + + Parameters + ---------- + modelB: + the reduced model against which to test. + + Example: + -------- + ``` + modelA = Model().fit() + modelB = Model().fit() + modelA.test_reduced(modelB) + ``` + """ + raise NotImplementedError + + def cond(self, **kwargs) -> np.ndarray: + """ + The intention is to make contrasts using this function as in glmGamPoi + + >>> res < -test_de( + ... fit, contrast=cond(cell="B cells", condition="stim") - cond(cell="B cells", condition="ctrl") + ... ) + + Parameters + ---------- + **kwargs + + """ + + # TODO this is hacky - reach out to formulaic authors how to do this properly + def _get_var_from_colname(colname): + regex = re.compile(r"^.+\[T\.(.+)\]$") + return regex.search(colname).groups()[0] + + if not isinstance(self.design, ModelMatrix): + raise RuntimeError( + "Building contrasts with `cond` only works if you specified the model using a " + "formulaic formula. Please manually provide a contrast vector." + ) + cond_dict = kwargs + for var in self.variables: + var_type = self.design.model_spec.encoder_state[var][0].value + if var_type == "categorical": + all_categories = set(self.design.model_spec.encoder_state[var][1]["categories"]) + if var in kwargs: + if var_type == "categorical" and kwargs[var] not in all_categories: + raise ValueError( + f"You specified a non-existant category for {var}. Possible categories: {', '.join(all_categories)}" + ) + else: + # fill with default values + if var_type != "categorical": + cond_dict[var] = 0 + else: + var_cols = self.design.columns[self.design.columns.str.startswith(f"{var}[")] + + present_categories = {_get_var_from_colname(x) for x in var_cols} + dropped_category = all_categories - present_categories + assert len(dropped_category) == 1 + cond_dict[var] = next(iter(dropped_category)) + + df = pd.DataFrame([kwargs]) + + return self.design.model_spec.get_model_matrix(df) + + def contrast(self, column: str, baseline: str, group_to_compare: str) -> list: + """Build a simple contrast for pairwise comparisons. In the future all methods should be able to accept the output of :method:`StatsmodelsDE.contrast` but alas a big TODO.""" + return [column, baseline, group_to_compare] diff --git a/src/multi_condition_comparisions/methods/_edger.py b/src/multi_condition_comparisions/methods/_edger.py new file mode 100644 index 0000000..26393af --- /dev/null +++ b/src/multi_condition_comparisions/methods/_edger.py @@ -0,0 +1,140 @@ +import numpy as np +import pandas as pd +from scanpy import logging +from scipy.sparse import issparse + +from multi_condition_comparisions._util import check_is_integer_matrix + +from ._base import LinearModelBase + + +class EdgeR(LinearModelBase): + """Differential expression test using EdgeR""" + + def _check_counts(self): + check_is_integer_matrix(self.data) + + def fit(self, **kwargs): # adata, design, mask, layer + """ + Fit model using edgeR. + + Note: this creates its own adata object for downstream. + + Params: + ---------- + **kwargs + Keyword arguments specific to glmQLFit() + ''' + + ## -- Check installations + """ + # For running in notebook + # pandas2ri.activate() + # rpy2.robjects.numpy2ri.activate() + + try: + import rpy2.robjects.numpy2ri + import rpy2.robjects.pandas2ri + from rpy2 import robjects as ro + from rpy2.robjects import numpy2ri, pandas2ri + from rpy2.robjects.conversion import localconverter + from rpy2.robjects.packages import importr + + pandas2ri.activate() + rpy2.robjects.numpy2ri.activate() + + except ImportError: + raise ImportError("edger requires rpy2 to be installed. ") from None + + try: + edger = importr("edgeR") + except ImportError: + raise ImportError( + "edgeR requires a valid R installation with the following packages: " "edgeR, BiocParallel, RhpcBLASctl" + ) from None + + # Convert dataframe + with localconverter(ro.default_converter + numpy2ri.converter): + expr = self.adata.X if self.layer is None else self.adata.layers[self.layer] + if issparse(expr): + expr = expr.T.toarray() + else: + expr = expr.T + + expr_r = ro.conversion.py2rpy(pd.DataFrame(expr, index=self.adata.var_names, columns=self.adata.obs_names)) + + # Convert to DGE + dge = edger.DGEList(counts=expr_r, samples=self.adata.obs) + + # Run EdgeR + logging.info("Calculating NormFactors") + dge = edger.calcNormFactors(dge) + + logging.info("Estimating Dispersions") + dge = edger.estimateDisp(dge, design=self.design) + + logging.info("Fitting linear model") + fit = edger.glmQLFit(dge, design=self.design, **kwargs) + + # Save object + ro.globalenv["fit"] = fit + self.fit = fit + + def _test_single_contrast(self, contrast: list[str], **kwargs) -> pd.DataFrame: + """ + Conduct test for each contrast and return a data frame + + Parameters + ---------- + contrast: + numpy array of integars indicating contrast + i.e. [-1, 0, 1, 0, 0] + """ + ## -- Check installations + # For running in notebook + # pandas2ri.activate() + # rpy2.robjects.numpy2ri.activate() + + # ToDo: + # parse **kwargs to R function + # Fix mask for .fit() + + try: + import rpy2.robjects.numpy2ri + import rpy2.robjects.pandas2ri # noqa: F401 + from rpy2 import robjects as ro + from rpy2.robjects import numpy2ri, pandas2ri # noqa: F401 + from rpy2.robjects.conversion import localconverter # noqa: F401 + from rpy2.robjects.packages import importr + + except ImportError: + raise ImportError("edger requires rpy2 to be installed.") from None + + try: + importr("edgeR") + except ImportError: + raise ImportError( + "edgeR requires a valid R installation with the following packages: " "edgeR, BiocParallel, RhpcBLASctl" + ) from None + + # Convert vector to R, which drops a category like `self.design_matrix` to use the intercept for the left out. + contrast_vec = [0] * len(self.design.columns) + make_contrast_column_key = lambda ind: f"{contrast[0]}[T.{contrast[ind]}]" + for index in [1, 2]: + if make_contrast_column_key(index) in self.design.columns: + contrast_vec[self.design.columns.to_list().index(f"{contrast[0]}[T.{contrast[index]}]")] = 1 + contrast_vec_r = ro.conversion.py2rpy(np.asarray(contrast_vec)) + ro.globalenv["contrast_vec"] = contrast_vec_r + + # Test contrast with R + ro.r( + """ + test = edgeR::glmQLFTest(fit, contrast=contrast_vec) + de_res = edgeR::topTags(test, n=Inf, adjust.method="BH", sort.by="PValue")$table + """ + ) + + # Convert results to pandas + de_res = ro.conversion.rpy2py(ro.globalenv["de_res"]) + + return de_res.rename(columns={"PValue": "pvals", "logFC": "logfoldchanges", "FDR": "pvals_adj"}) diff --git a/src/multi_condition_comparisions/methods/_pydeseq2.py b/src/multi_condition_comparisions/methods/_pydeseq2.py new file mode 100644 index 0000000..439a44a --- /dev/null +++ b/src/multi_condition_comparisions/methods/_pydeseq2.py @@ -0,0 +1,67 @@ +import os +import re +import warnings + +import pandas as pd +from pydeseq2.dds import DeseqDataSet +from pydeseq2.default_inference import DefaultInference +from pydeseq2.ds import DeseqStats + +from multi_condition_comparisions._util import check_is_integer_matrix + +from ._base import LinearModelBase + + +class PyDESeq2(LinearModelBase): + """Differential expression test using a PyDESeq2""" + + def _check_counts(self): + check_is_integer_matrix(self.data) + + def fit(self, **kwargs) -> pd.DataFrame: + """ + Fit dds model using pydeseq2. + + Note: this creates its own AnnData object for downstream processing. + + Params: + ---------- + **kwargs + Keyword arguments specific to DeseqDataSet(), except for `n_cpus` which will use all available CPUs minus one if the argument is not passed. + """ + inference = DefaultInference(n_cpus=kwargs.pop("n_cpus", os.cpu_count() - 1)) + covars = self.design.columns.tolist() + if "Intercept" not in covars: + warnings.warn( + "Warning: Pydeseq is hard-coded to use Intercept, please include intercept into the model", stacklevel=2 + ) + processed_covars = list({re.sub(r"\[T\.(.*)\]", "", col) for col in covars if col != "Intercept"}) + dds = DeseqDataSet( + adata=self.adata, design_factors=processed_covars, refit_cooks=True, inference=inference, **kwargs + ) + # workaround code to insert design array + des_mtx_cols = dds.obsm["design_matrix"].columns + dds.obsm["design_matrix"] = self.design + if dds.obsm["design_matrix"].shape[1] == len(des_mtx_cols): + dds.obsm["design_matrix"].columns = des_mtx_cols.copy() + + dds.deseq2() + self.dds = dds + + def _test_single_contrast(self, contrast: list[str], alpha=0.05, **kwargs) -> pd.DataFrame: + """ + Conduct a specific test and returns a data frame + + Parameters + ---------- + contrasts: + list of three strings of the form + ["variable", "tested level", "reference level"] + alpha: p value threshold used for controlling fdr with + independent hypothesis weighting + kwargs: extra arguments to pass to DeseqStats() + """ + stat_res = DeseqStats(self.dds, contrast=contrast, alpha=alpha, **kwargs) + stat_res.summary() + stat_res.p_values + return pd.DataFrame(stat_res.results_df).sort_values("padj") diff --git a/src/multi_condition_comparisions/methods/_simple_tests.py b/src/multi_condition_comparisions/methods/_simple_tests.py new file mode 100644 index 0000000..ddf507b --- /dev/null +++ b/src/multi_condition_comparisions/methods/_simple_tests.py @@ -0,0 +1,107 @@ +"""Simple tests such as t-test, wilcoxon""" + +import warnings +from abc import abstractmethod +from collections.abc import Sequence + +import numpy as np +import pandas as pd +import scipy.stats +from anndata import AnnData +from pandas.core.api import DataFrame as DataFrame +from scipy.sparse import issparse +from tqdm.auto import tqdm + +from ._base import MethodBase + + +class SimpleComparisonBase(MethodBase): + @abstractmethod + def _compare_single_group( + self, baseline_idx: np.ndarray, comparison_idx: np.ndarray, *, paired: bool = False + ) -> pd.DataFrame: + """ + Perform a single comparison between two groups. + + Parameters + ---------- + baseline_idx + numeric indices indicating which observations are in the baseline group + comparison_idx + numeric indices indicating which observations are in the comparison/treatment group + paired + whether or not to perform a paired test. Note that in the case of a paired test, + the indices must be ordered such that paired observations appear at the same position. + """ + ... + + @classmethod + def compare_groups( + cls, + adata: AnnData, + column: str, + baseline: str, + groups_to_compare: str | Sequence[str], + *, + paired_by: str | None = None, + mask: str | None = None, + layer: str | None = None, + fit_kwargs: dict = None, + test_kwargs: dict = None, + ) -> DataFrame: + if test_kwargs is None: + test_kwargs = {} + if fit_kwargs is None: + fit_kwargs = {} + if len(fit_kwargs) or len(test_kwargs): + warnings.warn("Simple tests do not use fit or test kwargs", UserWarning, stacklevel=2) + if paired_by is not None: + adata = adata.copy()[adata.obs.sort_values(paired_by).index, :] + model = cls(adata, mask=mask, layer=layer) + if isinstance(groups_to_compare, str): + groups_to_compare = [groups_to_compare] + + res_dfs = [] + for group_to_compare in groups_to_compare: + comparison_idx = np.where(adata.obs[column] == group_to_compare)[0] + if baseline is None: + baseline_idx = np.where(adata.obs[column] != group_to_compare)[0] + else: + baseline_idx = np.where(adata.obs[column] == baseline)[0] + res_dfs.append( + model._compare_single_group(baseline_idx, comparison_idx).assign( + comparison=f"{group_to_compare}_vs_{baseline if baseline is not None else 'rest'}" + ) + ) + return pd.concat(res_dfs) + + +class WilcoxonTest(SimpleComparisonBase): + """Perform a unpaired or paired Wilcoxon test. + + (the former is also known as "Mann-Whitney U test", the latter as "wilcoxon signed rank test") + """ + + def _compare_single_group( + self, baseline_idx: np.ndarray, comparison_idx: np.ndarray, *, paired: bool = False + ) -> DataFrame: + if paired: + assert len(baseline_idx) == len(comparison_idx), "For a paired test, indices must be of the same length" + test_fun = scipy.stats.wilcoxon + else: + test_fun = scipy.stats.mannwhitneyu + + # TODO can be made more efficient by converting CSR/CSC matrices accordingly + x0 = self.data[baseline_idx, :] + x1 = self.data[comparison_idx, :] + res = [] + for var in tqdm(self.adata.var_names): + tmp_x0 = x0[:, self.adata.var_names == var] + tmp_x0 = np.asarray(tmp_x0.todense()).flatten() if issparse(tmp_x0) else tmp_x0.flatten() + tmp_x1 = x1[:, self.adata.var_names == var] + tmp_x1 = np.asarray(tmp_x1.todense()).flatten() if issparse(tmp_x1) else tmp_x1.flatten() + pval = test_fun(x=tmp_x0, y=tmp_x1).pvalue + mean_x0 = np.asarray(np.mean(x0, axis=0)).flatten().astype(dtype=float) + mean_x1 = np.asarray(np.mean(x1, axis=0)).flatten().astype(dtype=float) + res.append({"variable": var, "pvals": pval, "fold_change": np.log(mean_x1) - np.log(mean_x0)}) + return pd.DataFrame(res).sort_values("pvals").set_index("variable") diff --git a/src/multi_condition_comparisions/methods/_statsmodels.py b/src/multi_condition_comparisions/methods/_statsmodels.py new file mode 100644 index 0000000..d23ce03 --- /dev/null +++ b/src/multi_condition_comparisions/methods/_statsmodels.py @@ -0,0 +1,79 @@ +import numpy as np +import pandas as pd +import scanpy as sc +import statsmodels +import statsmodels.api as sm +from tqdm.auto import tqdm + +from multi_condition_comparisions._util import check_is_integer_matrix + +from ._base import LinearModelBase + + +class Statsmodels(LinearModelBase): + """Differential expression test using a statsmodels linear regression""" + + def _check_counts(self): + check_is_integer_matrix(self.data) + + def fit( + self, + regression_model: sm.OLS | sm.GLM = sm.OLS, + **kwargs, + ) -> None: + """ + Fit the specified regression model. + + Parameters + ---------- + regression_model + A statsmodels regression model class, either OLS or GLM. Defaults to OLS. + + **kwargs + Additional arguments for fitting the specific method. In particular, this + is where you can specify the family for GLM. + + Example + ------- + >>> import statsmodels.api as sm + >>> model = StatsmodelsDE(adata, design="~condition") + >>> model.fit(sm.GLM, family=sm.families.NegativeBinomial(link=sm.families.links.Log())) + >>> results = model.test_contrasts(np.array([0, 1])) + """ + self.models = [] + for var in tqdm(self.adata.var_names): + mod = regression_model( + sc.get.obs_df(self.adata, keys=[var], layer=self.layer)[var], + self.design, + **kwargs, + ) + mod = mod.fit() + self.models.append(mod) + + def _test_single_contrast(self, contrast, **kwargs) -> pd.DataFrame: + res = [] + for var, mod in zip(tqdm(self.adata.var_names), self.models, strict=False): + t_test = mod.t_test(contrast) + res.append( + { + "variable": var, + "pvalue": t_test.pvalue, + "tvalue": t_test.tvalue.item(), + "sd": t_test.sd.item(), + "logfoldchanges": t_test.effect.item(), + "padj": statsmodels.stats.multitest.fdrcorrection(np.array([t_test.pvalue]))[1].item(), + } + ) + return pd.DataFrame(res).sort_values("pvalue").set_index("variable") + + def contrast(self, column: str, baseline: str, group_to_compare: str) -> np.ndarray: + """ + Build a simple contrast for pairwise comparisons. + + This is equivalent to + + ``` + model.cond( = baseline) - model.cond( = group_to_compare) + ``` + """ + return self.cond(**{column: baseline}) - self.cond(**{column: group_to_compare}) diff --git a/src/multi_condition_comparisions/pl/__init__.py b/src/multi_condition_comparisions/pl/__init__.py index 4451023..e931c2f 100644 --- a/src/multi_condition_comparisions/pl/__init__.py +++ b/src/multi_condition_comparisions/pl/__init__.py @@ -1 +1,3 @@ from . import volcano + +__all__ = ["volcano"] diff --git a/src/multi_condition_comparisions/pp/__init__.py b/src/multi_condition_comparisions/pp/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/multi_condition_comparisions/tl/__init__.py b/src/multi_condition_comparisions/tl/__init__.py deleted file mode 100644 index 99fe18e..0000000 --- a/src/multi_condition_comparisions/tl/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from . import de -from .wrapper import run_de diff --git a/src/multi_condition_comparisions/tl/de.py b/src/multi_condition_comparisions/tl/de.py deleted file mode 100644 index d4c9b97..0000000 --- a/src/multi_condition_comparisions/tl/de.py +++ /dev/null @@ -1,477 +0,0 @@ -import re -import warnings -from abc import ABC, abstractmethod - -import numpy as np -import pandas as pd -import scanpy as sc -import statsmodels -import statsmodels.api as sm -from anndata import AnnData -from formulaic import model_matrix -from formulaic.model_matrix import ModelMatrix -from pydeseq2.dds import DeseqDataSet -from pydeseq2.default_inference import DefaultInference -from pydeseq2.ds import DeseqStats -from scanpy import logging -from scipy.sparse import issparse, spmatrix -from tqdm.auto import tqdm - - -class BaseMethod(ABC): - """Base method for DE testing that is implemented per DE test.""" - - def __init__( - self, - adata: AnnData, - design: str | np.ndarray, - mask: str | None = None, - layer: str | None = None, - **kwargs, - ): - """ - Initialize the method - - Parameters - ---------- - adata - AnnData object, usually pseudobulked. - design - Model design. Can be either a design matrix, a formulaic formula.Formulaic formula in the format 'x + z' or '~x+z'. - mask - A column in adata.var that contains a boolean mask with selected features. - layer - Layer to use in fit(). If None, use the X array. - **kwargs - Keyword arguments specific to the method implementation - """ - self.adata = adata - if mask is not None: - self.adata = self.adata[:, self.adata.var[mask]] - - # Do some sanity checks on the input. Do them after the mask is applied. - - # Check that counts have no NaN or Inf values. - if np.any(np.logical_or(adata.X < 0, np.isnan(self.adata.X))) or np.any(np.isinf(self.adata.X)): - raise ValueError("Counts cannot contain negative, NaN or Inf values.") - # Check that counts have numeric values. - if not np.issubdtype(self.adata.X.dtype, np.number): - raise ValueError("Counts must be numeric.") - - self._check_counts() - - self.layer = layer - if isinstance(design, str): - self.design = model_matrix(design, adata.obs) - else: - self.design = design - - def _check_count_matrix(self, array: np.ndarray | spmatrix, tolerance: float = 1e-6) -> bool: - if issparse(array): - if not array.data.dtype.kind == "i": - raise ValueError("Non-zero elements of the matrix must be integers.") - - if not np.all(np.abs(array.data - np.round(array.data)) < tolerance): - raise ValueError("Non-zero elements of the matrix must be close to integer values.") - else: - if not array.dtype.kind == "i" or not np.all(np.abs(array - np.round(array)) < tolerance): - raise ValueError("Matrix must be a count matrix.") - if (array < 0).sum() > 0: - raise ValueError("Non.zero elements of the matrix must be postiive.") - - return True - - @property - def variables(self): - """Get the names of the variables used in the model definition""" - return self.design.model_spec.variables_by_source["data"] - - @abstractmethod - def _check_counts(self) -> bool: - """ - Check that counts are valid for the specific method. - - Different methods may have different requirements. - - Returns - ------- - bool - True if counts are valid, False otherwise. - """ - ... - - @abstractmethod - def fit(self, **kwargs) -> None: - """ - Fit the model - - Parameters - ---------- - **kwargs - Additional arguments for fitting the specific method. - """ - ... - - @abstractmethod - def _test_single_contrast(self, contrast, **kwargs) -> pd.DataFrame: - ... - - def test_contrasts(self, contrasts: list[str] | dict[str, np.ndarray] | np.ndarray, **kwargs) -> pd.DataFrame: - """ - Conduct a specific test. Please use :method:`contrast` to build the contrasts instead of building it on your own. - - Parameters - ---------- - contrasts: - either a single contrast, or a dictionary of contrasts where the key is the name for that particular contrast. - Each contrast can be either a vector of coefficients (the most general case), a string, or a some fancy DSL - (details still need to be figured out). - - or a tuple withe three elements contrasts = ("condition", "control", "treatment") - """ - if not isinstance(contrasts, dict): - contrasts = {None: contrasts} - results = [] - for name, contrast in contrasts.items(): - results.append(self._test_single_contrast(contrast, **kwargs).assign(contrast=name)) - - results_df = pd.concat(results) - results_df.rename( - columns={"pvalue": "pvals", "padj": "pvals_adj", "log2FoldChange": "logfoldchanges"}, inplace=True - ) - - return results_df - - def test_reduced(self, modelB: "BaseMethod") -> pd.DataFrame: - """ - Test against a reduced model - - Parameters - ---------- - modelB: - the reduced model against which to test. - - Example: - -------- - ``` - modelA = Model().fit() - modelB = Model().fit() - modelA.test_reduced(modelB) - ``` - """ - raise NotImplementedError - - def cond(self, **kwargs) -> np.ndarray: - """ - The intention is to make contrasts using this function as in glmGamPoi - - >>> res < -test_de( - ... fit, contrast=cond(cell="B cells", condition="stim") - cond(cell="B cells", condition="ctrl") - ... ) - - Parameters - ---------- - **kwargs - - """ - - # TODO this is hacky - reach out to formulaic authors how to do this properly - def _get_var_from_colname(colname): - regex = re.compile(r"^.+\[T\.(.+)\]$") - return regex.search(colname).groups()[0] - - if not isinstance(self.design, ModelMatrix): - raise RuntimeError( - "Building contrasts with `cond` only works if you specified the model using a " - "formulaic formula. Please manually provide a contrast vector." - ) - cond_dict = kwargs - for var in self.variables: - var_type = self.design.model_spec.encoder_state[var][0].value - if var_type == "categorical": - all_categories = set(self.design.model_spec.encoder_state[var][1]["categories"]) - if var in kwargs: - if var_type == "categorical" and kwargs[var] not in all_categories: - raise ValueError( - f"You specified a non-existant category for {var}. Possible categories: {', '.join(all_categories)}" - ) - else: - # fill with default values - if var_type != "categorical": - cond_dict[var] = 0 - else: - var_cols = self.design.columns[self.design.columns.str.startswith(f"{var}[")] - - present_categories = {_get_var_from_colname(x) for x in var_cols} - dropped_category = all_categories - present_categories - assert len(dropped_category) == 1 - cond_dict[var] = next(iter(dropped_category)) - - df = pd.DataFrame([kwargs]) - - return self.design.model_spec.get_model_matrix(df) - - def contrast(self, column: str, baseline: str, group_to_compare: str) -> object: - """Build a simple contrast for pairwise comparisons. In the future all methods should be able to accept the output of :method:`StatsmodelsDE.contrast` but alas a big TODO.""" - return [column, baseline, group_to_compare] - - -class StatsmodelsDE(BaseMethod): - """Differential expression test using a statsmodels linear regression""" - - def _check_counts(self) -> bool: - return self._check_count_matrix(self.adata.X) - - def fit( - self, - regression_model: sm.OLS | sm.GLM = sm.OLS, - **kwargs, - ) -> None: - """ - Fit the specified regression model. - - Parameters - ---------- - regression_model - A statsmodels regression model class, either OLS or GLM. Defaults to OLS. - - **kwargs - Additional arguments for fitting the specific method. In particular, this - is where you can specify the family for GLM. - - Example - ------- - >>> import statsmodels.api as sm - >>> model = StatsmodelsDE(adata, design="~condition") - >>> model.fit(sm.GLM, family=sm.families.NegativeBinomial(link=sm.families.links.Log())) - >>> results = model.test_contrasts(np.array([0, 1])) - """ - self.models = [] - for var in tqdm(self.adata.var_names): - mod = regression_model( - sc.get.obs_df(self.adata, keys=[var], layer=self.layer)[var], - self.design, - **kwargs, - ) - mod = mod.fit() - self.models.append(mod) - - def _test_single_contrast(self, contrast, **kwargs) -> pd.DataFrame: - res = [] - for var, mod in zip(tqdm(self.adata.var_names), self.models, strict=False): - t_test = mod.t_test(contrast) - res.append( - { - "variable": var, - "pvalue": t_test.pvalue, - "tvalue": t_test.tvalue.item(), - "sd": t_test.sd.item(), - "logfoldchanges": t_test.effect.item(), - "padj": statsmodels.stats.multitest.fdrcorrection(np.array([t_test.pvalue]))[1].item(), - } - ) - return pd.DataFrame(res).sort_values("pvalue").set_index("variable") - - def contrast(self, column: str, baseline: str, group_to_compare: str) -> np.ndarray: - """ - Build a simple contrast for pairwise comparisons. - - This is equivalent to - - ``` - model.cond( = baseline) - model.cond( = group_to_compare) - ``` - """ - return self.cond(**{column: baseline}) - self.cond(**{column: group_to_compare}) - - -class PyDESeq2DE(BaseMethod): - """Differential expression test using a PyDESeq2""" - - def _check_counts(self) -> bool: - return self._check_count_matrix(self.adata.X) - - def fit(self, **kwargs) -> pd.DataFrame: - """ - Fit dds model using pydeseq2. - - Note: this creates its own AnnData object for downstream processing. - - Params: - ---------- - **kwargs - Keyword arguments specific to DeseqDataSet() - """ - inference = DefaultInference(n_cpus=3) - covars = self.design.columns.tolist() - if "Intercept" not in covars: - warnings.warn( - "Warning: Pydeseq is hard-coded to use Intercept, please include intercept into the model", stacklevel=2 - ) - processed_covars = [col.split("[")[0] for col in covars if col != "Intercept"] - dds = DeseqDataSet( - adata=self.adata, design_factors=processed_covars, refit_cooks=True, inference=inference, **kwargs - ) - # workaround code to insert design array - des_mtx_cols = dds.obsm["design_matrix"].columns - dds.obsm["design_matrix"] = self.design - if dds.obsm["design_matrix"].shape[1] == len(des_mtx_cols): - dds.obsm["design_matrix"].columns = des_mtx_cols.copy() - - dds.deseq2() - self.dds = dds - - def _test_single_contrast(self, contrast: list[str], alpha=0.05, **kwargs) -> pd.DataFrame: - """ - Conduct a specific test and returns a data frame - - Parameters - ---------- - contrasts: - list of three strings of the form - ["variable", "tested level", "reference level"] - alpha: p value threshold used for controlling fdr with - independent hypothesis weighting - kwargs: extra arguments to pass to DeseqStats() - """ - stat_res = DeseqStats(self.dds, contrast=contrast, alpha=alpha, **kwargs) - stat_res.summary() - stat_res.p_values - return pd.DataFrame(stat_res.results_df).sort_values("padj") - - -class EdgeRDE(BaseMethod): - """Differential expression test using EdgeR""" - - def _check_counts(self) -> bool: - return self._check_count_matrix(self.adata.X) - - def fit(self, **kwargs): # adata, design, mask, layer - """ - Fit model using edgeR. - - Note: this creates its own adata object for downstream. - - Params: - ---------- - **kwargs - Keyword arguments specific to glmQLFit() - ''' - - ## -- Check installations - """ - # For running in notebook - # pandas2ri.activate() - # rpy2.robjects.numpy2ri.activate() - - try: - import rpy2.robjects.numpy2ri - import rpy2.robjects.pandas2ri - from rpy2 import robjects as ro - from rpy2.robjects import numpy2ri, pandas2ri - from rpy2.robjects.conversion import localconverter - from rpy2.robjects.packages import importr - - pandas2ri.activate() - rpy2.robjects.numpy2ri.activate() - - except ImportError: - raise ImportError("edger requires rpy2 to be installed. ") from None - - try: - edger = importr("edgeR") - except ImportError: - raise ImportError( - "edgeR requires a valid R installation with the following packages: " "edgeR, BiocParallel, RhpcBLASctl" - ) from None - - ## -- Convert dataframe - # Feature selection - # if mask is not None: - # self.adata = self.adata[:,~self.adata.var[mask]] - - # Convert dataframe - with localconverter(ro.default_converter + numpy2ri.converter): - expr = self.adata.X if self.layer is None else self.adata.layers[self.layer] - if issparse(expr): - expr = expr.T.toarray() - else: - expr = expr.T - - expr_r = ro.conversion.py2rpy(pd.DataFrame(expr, index=self.adata.var_names, columns=self.adata.obs_names)) - - # Convert to DGE - dge = edger.DGEList(counts=expr_r, samples=self.adata.obs) - - # Run EdgeR - logging.info("Calculating NormFactors") - dge = edger.calcNormFactors(dge) - - logging.info("Estimating Dispersions") - dge = edger.estimateDisp(dge, design=self.design) - - logging.info("Fitting linear model") - fit = edger.glmQLFit(dge, design=self.design, **kwargs) - - # Save object - ro.globalenv["fit"] = fit - self.fit = fit - - def _test_single_contrast(self, contrast: list[str], **kwargs) -> pd.DataFrame: - """ - Conduct test for each contrast and return a data frame - - Parameters - ---------- - contrast: - numpy array of integars indicating contrast - i.e. [-1, 0, 1, 0, 0] - """ - ## -- Check installations - # For running in notebook - # pandas2ri.activate() - # rpy2.robjects.numpy2ri.activate() - - # ToDo: - # parse **kwargs to R function - # Fix mask for .fit() - - try: - import rpy2.robjects.numpy2ri - import rpy2.robjects.pandas2ri # noqa: F401 - from rpy2 import robjects as ro - from rpy2.robjects import numpy2ri, pandas2ri # noqa: F401 - from rpy2.robjects.conversion import localconverter # noqa: F401 - from rpy2.robjects.packages import importr - - except ImportError: - raise ImportError("edger requires rpy2 to be installed.") from None - - try: - importr("edgeR") - except ImportError: - raise ImportError( - "edgeR requires a valid R installation with the following packages: " "edgeR, BiocParallel, RhpcBLASctl" - ) from None - - # Convert vector to R, which drops a category like `self.design_matrix` to use the intercept for the left out. - contrast_vec = [0] * len(self.design.columns) - make_contrast_column_key = lambda ind: f"{contrast[0]}[T.{contrast[ind]}]" - for index in [1, 2]: - if make_contrast_column_key(index) in self.design.columns: - contrast_vec[self.design.columns.to_list().index(f"{contrast[0]}[T.{contrast[index]}]")] = 1 - contrast_vec_r = ro.conversion.py2rpy(np.asarray(contrast_vec)) - ro.globalenv["contrast_vec"] = contrast_vec_r - - # Test contrast with R - ro.r( - """ - test = edgeR::glmQLFTest(fit, contrast=contrast_vec) - de_res = edgeR::topTags(test, n=Inf, adjust.method="BH", sort.by="PValue")$table - """ - ) - - # Convert results to pandas - de_res = ro.conversion.rpy2py(ro.globalenv["de_res"]) - - return de_res.rename(columns={"PValue": "pvals", "logFC": "logfoldchanges", "FDR": "pvals_adj"}) diff --git a/src/multi_condition_comparisions/tl/wrapper.py b/src/multi_condition_comparisions/tl/wrapper.py deleted file mode 100644 index 48b8663..0000000 --- a/src/multi_condition_comparisions/tl/wrapper.py +++ /dev/null @@ -1,63 +0,0 @@ -from typing import Any, Literal, TypedDict - -import numpy as np -from anndata import AnnData - -from multi_condition_comparisions.tl.de import EdgeRDE, PyDESeq2DE, StatsmodelsDE - -MethodRegistry: dict[str, Any] = {"DESeq": PyDESeq2DE, "edgeR": EdgeRDE, "statsmodels": StatsmodelsDE} - - -class Contrast(TypedDict): - """Contrast typed dict.""" - - column: str - baseline: str - group_to_compare: str - - -def run_de( - adata: AnnData, - contrasts: dict[str, Contrast | list[str]], - method: Literal["DESeq", "edgeR", "statsmodels"], - design: str | np.ndarray | None = None, - mask: str | None = None, - layer: str | None = None, - **kwargs, -): - """ - Wrapper function to run differential expression analysis. - - Params: - ---------- - adata - AnnData object, usually pseudobulked. - contrasts - Contrasts to perform. Mapping from names to tests. - method - Method to perform DE. - design (optional) - Model design. Can be either a design matrix, a formulaic formula. If None, contrasts should be provided. - mask (optional) - A column in adata.var that contains a boolean mask with selected features. - layer (optional) - Layer to use in fit(). If None, use the X matrix. - **kwargs - Keyword arguments specific to the method implementation. - """ - ## Initialise object - model = MethodRegistry[method](adata, design, mask=mask, layer=layer) - - ## Fit model - model.fit(**kwargs) - - ## Test contrasts - de_res = model.test_contrasts( - { - name: model.contrast(**contrast) if isinstance(contrast, dict) else model.contrast(*contrast) - for name, contrast in contrasts.items() - }, - **kwargs, - ) - - return de_res diff --git a/tests/conftest.py b/tests/conftest.py index 43d7c8a..7371e3e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,9 +1,9 @@ import anndata as ad +import numpy as np +import pandas as pd import pytest from pydeseq2.utils import load_example_data -from multi_condition_comparisions.tl.de import StatsmodelsDE - @pytest.fixture def test_counts(): @@ -29,5 +29,33 @@ def test_adata(test_counts, test_metadata): @pytest.fixture -def statsmodels_stub(test_adata): - return StatsmodelsDE(adata=test_adata, design="~condition") +def test_adata_minimal(): + n_obs = 80 + n_donors = n_obs // 4 + obs = pd.DataFrame( + { + "condition": ["A", "B"] * (n_obs // 2), + "donor": sum(([f"D{i}"] * n_donors for i in range(n_obs // n_donors)), []), + "other": (["X"] * (n_obs // 4)) + (["Y"] * ((3 * n_obs) // 4)), + "pairing": sum(([str(i), str(i)] for i in range(n_obs // 2)), []), + "continuous": np.random.uniform(0, 1) * 4000, + }, + ) + var = pd.DataFrame(index=["gene1", "gene2"]) + rng = np.random.default_rng(9) # make tests deterministic + group1 = rng.negative_binomial(20, 0.1, n_obs // 2) # large mean + group2 = rng.negative_binomial(5, 0.5, n_obs // 2) # small mean + + condition_data = np.empty((n_obs,), dtype=group1.dtype) + condition_data[0::2] = group1 + condition_data[1::2] = group2 + + donor_data = np.empty((n_obs,), dtype=group1.dtype) + donor_data[0:n_donors] = group2[:n_donors] + donor_data[n_donors : (2 * n_donors)] = group1[n_donors:] + + donor_data[(2 * n_donors) : (3 * n_donors)] = group2[:n_donors] + donor_data[(3 * n_donors) :] = group1[n_donors:] + + X = np.vstack([condition_data, donor_data]).T + return ad.AnnData(X=X, obs=obs, var=var) diff --git a/tests/test_de.py b/tests/test_de.py index e39cdd2..a57f6ae 100644 --- a/tests/test_de.py +++ b/tests/test_de.py @@ -4,7 +4,7 @@ from pandas import testing as tm import multi_condition_comparisions -from multi_condition_comparisions.tl.de import EdgeRDE, PyDESeq2DE, StatsmodelsDE +from multi_condition_comparisions.methods import EdgeR, PyDESeq2, Statsmodels, WilcoxonTest def test_package_has_version(): @@ -15,10 +15,10 @@ def test_package_has_version(): "method_class,kwargs", [ # OLS - (StatsmodelsDE, {}), + (Statsmodels, {}), # Negative Binomial ( - StatsmodelsDE, + Statsmodels, {"regression_model": sm.GLM, "family": sm.families.NegativeBinomial()}, ), ], @@ -40,7 +40,7 @@ def test_pydeseq2_simple(test_adata): 2. Fitted 3. and that test_contrast returns a DataFrame with the correct number of rows. """ - method = PyDESeq2DE(adata=test_adata, design="~condition") + method = PyDESeq2(adata=test_adata, design="~condition") method.fit() res_df = method.test_contrasts(["condition", "A", "B"]) @@ -54,7 +54,7 @@ def test_edger_simple(test_adata): 2. Fitted 3. and that test_contrast returns a DataFrame with the correct number of rows. """ - method = EdgeRDE(adata=test_adata, design="~condition") + method = EdgeR(adata=test_adata, design="~condition") method.fit() res_df = method.test_contrasts(["condition", "A", "B"]) @@ -66,7 +66,7 @@ def test_pydeseq2_complex(test_adata): method returns a dataframe with the correct number of rows. """ test_adata.obs["condition1"] = test_adata.obs["condition"].copy() - method = PyDESeq2DE(adata=test_adata, design="~condition1+group") + method = PyDESeq2(adata=test_adata, design="~condition1+group") method.fit() res_df = method.test_contrasts(["condition1", "A", "B"]) @@ -84,7 +84,7 @@ def test_edger_complex(test_adata): method returns a dataframe with the correct number of rows. """ test_adata.obs["condition1"] = test_adata.obs["condition"].copy() - method = EdgeRDE(adata=test_adata, design="~condition1+group") + method = EdgeR(adata=test_adata, design="~condition1+group") method.fit() res_df = method.test_contrasts(["condition1", "A", "B"]) @@ -95,3 +95,13 @@ def test_edger_complex(test_adata): expected_columns = {"pvals", "pvals_adj", "logfoldchanges"} assert expected_columns.issubset(set(res_df.columns)) assert np.all((0 <= res_df["pvals"]) & (res_df["pvals"] <= 1)) + + +@pytest.mark.parametrize("paired_by", [None, "pairings"]) +def test_non_parametric(test_adata, paired_by): + if paired_by is not None: + test_adata.obs[paired_by] = list(range(sum(test_adata.obs["condition"] == "A"))) * 2 + res_df = WilcoxonTest.compare_groups( + adata=test_adata, column="condition", baseline="A", groups_to_compare="B", paired_by=paired_by + ) + assert np.all((0 <= res_df["pvals"]) & (res_df["pvals"] <= 1)) # TODO: which of these should actually be <.05? diff --git a/tests/test_edge_cases.py b/tests/test_edge_cases.py index 45e0084..c6e9a10 100644 --- a/tests/test_edge_cases.py +++ b/tests/test_edge_cases.py @@ -1,10 +1,10 @@ import anndata as ad import numpy as np import pytest -import scipy as sp from scipy.sparse import csr_matrix -from multi_condition_comparisions.tl.de import StatsmodelsDE +from multi_condition_comparisions._util import check_is_integer_matrix +from multi_condition_comparisions.methods import Statsmodels @pytest.mark.parametrize("invalid_input", [np.nan, np.inf, "foo"]) @@ -13,44 +13,44 @@ def test_invalid_inputs(invalid_input, test_counts, test_metadata): test_counts[0, 0] = invalid_input adata = ad.AnnData(X=test_counts, obs=test_metadata) with pytest.raises((ValueError, TypeError)): - StatsmodelsDE(adata=adata, design="~condition") + Statsmodels(adata=adata, design="~condition") -def test_valid_count_matrix(statsmodels_stub): +def test_valid_count_matrix(): """Test with a valid count matrix.""" matrix = np.array([[1, 2], [3, 4]]) - assert statsmodels_stub._check_count_matrix(matrix) + check_is_integer_matrix(matrix) -def test_valid_sparse_count_matrix(statsmodels_stub): +def test_valid_sparse_count_matrix(): """Test with a valid sparse count matrix.""" - matrix = sp.sparse.csr_matrix([[1, 0], [0, 2]]) - assert statsmodels_stub._check_count_matrix(matrix) + matrix = csr_matrix(np.array([[1, 0], [0, 2]]), dtype="int32") + check_is_integer_matrix(matrix) -def test_negative_values(statsmodels_stub): +def test_negative_values(): """Test with a matrix containing negative values.""" matrix = np.array([[1, -2], [3, 4]]) with pytest.raises(ValueError): - statsmodels_stub._check_count_matrix(matrix) + check_is_integer_matrix(matrix) -def test_non_integer_values(statsmodels_stub): +def test_non_integer_values(): """Test with a matrix containing non-integer values.""" matrix = np.array([[1.5, 2], [3, 4]]) with pytest.raises(ValueError): - statsmodels_stub._check_count_matrix(matrix) + check_is_integer_matrix(matrix) -def test_matrix_with_nans(statsmodels_stub): +def test_matrix_with_nans(): """Test with a matrix containing NaNs.""" matrix = np.array([[1, np.nan], [3, 4]]) with pytest.raises(ValueError): - statsmodels_stub._check_count_matrix(matrix) + check_is_integer_matrix(matrix) -def test_sparse_matrix_with_nans(statsmodels_stub): +def test_sparse_matrix_with_nans(): """Test with a sparse matrix containing NaNs.""" matrix = csr_matrix([[1, np.nan], [3, 4]]) with pytest.raises(ValueError): - statsmodels_stub._check_count_matrix(matrix) + check_is_integer_matrix(matrix) diff --git a/tests/test_unified_api.py b/tests/test_unified_api.py new file mode 100644 index 0000000..951caa3 --- /dev/null +++ b/tests/test_unified_api.py @@ -0,0 +1,26 @@ +import pytest + +from multi_condition_comparisions.methods import EdgeR, PyDESeq2, Statsmodels, WilcoxonTest + + +@pytest.mark.parametrize("method", [WilcoxonTest, Statsmodels, PyDESeq2, EdgeR]) +@pytest.mark.parametrize("paired_by", ["pairing", None]) +def test_unified(test_adata_minimal, method, paired_by): + res_df = method.compare_groups( + adata=test_adata_minimal, column="condition", baseline="A", groups_to_compare="B", paired_by=paired_by + ) + if paired_by is None: + assert res_df.loc["gene1"]["pvals"] < 0.005 + assert res_df.loc["gene2"]["pvals"] > 0.005 + else: + assert res_df.loc["gene2"]["pvals"] > res_df.loc["gene1"]["pvals"] + res_df = method.compare_groups( + adata=test_adata_minimal, + column="donor", + baseline="D0", + groups_to_compare=["D1", "D2", "D3"], + paired_by=paired_by, + ) + assert res_df.loc["gene2"]["pvals"].values[0] < 0.005 + assert res_df.loc["gene2"]["pvals"].values[2] < 0.005 + assert res_df.loc["gene2"]["pvals"].values[1] > 0.005 # D1 is from same distribution diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py deleted file mode 100644 index 43df569..0000000 --- a/tests/test_wrapper.py +++ /dev/null @@ -1,89 +0,0 @@ -from inspect import signature -from typing import get_args - -import numpy as np -import pytest -from pandas import testing as tm - -from multi_condition_comparisions.tl.wrapper import MethodRegistry, run_de - - -def test_arg_types(): - run_de_args = set(get_args(signature(run_de).parameters["method"].annotation)) - assert run_de_args == set(MethodRegistry.keys()), run_de_args - - -@pytest.mark.parametrize( - "method", - list(MethodRegistry.keys()), -) -@pytest.mark.parametrize( - "design,contrasts", - [ - ( - "~condition", - { - "first": {"column": "condition", "baseline": "A", "group_to_compare": "B"}, - }, - ), - ( - "~condition", - { - "first": {"column": "condition", "baseline": "A", "group_to_compare": "B"}, - "second": {"column": "condition", "baseline": "B", "group_to_compare": "A"}, - }, - ), - ( - "~condition", - { - "first": ["condition", "A", "B"], - }, - ), - ( - "~condition", - { - "first": ["condition", "A", "B"], - "second": ["condition", "B", "A"], - }, - ), - ( - "~condition1+group", - { - "first": {"column": "condition1", "baseline": "A", "group_to_compare": "B"}, - }, - ), - ( - "~condition1+group", - { - "first": {"column": "condition1", "baseline": "A", "group_to_compare": "B"}, - "second": {"column": "condition1", "baseline": "B", "group_to_compare": "A"}, - }, - ), - ( - "~condition1+group", - { - "first": ["condition1", "A", "B"], - }, - ), - ( - "~condition1+group", - { - "first": ["condition1", "A", "B"], - "second": ["condition1", "B", "A"], - }, - ), - ], -) -def test_run_de(test_adata, method, design, contrasts): - test_adata.obs["condition1"] = test_adata.obs["condition"].copy() - res_df = run_de(adata=test_adata, method=method, contrasts=contrasts, design=design) - - assert len(res_df) == test_adata.n_vars * len(contrasts) - # Check that the index of the result per group matches the var_names of the AnnData object - tm.assert_index_equal( - test_adata.var_names, res_df.index[: len(res_df) // len(contrasts)], check_order=False, check_names=False - ) - - expected_columns = {"pvals", "pvals_adj", "logfoldchanges"} - assert expected_columns.issubset(set(res_df.columns.tolist())) - assert np.all((0 <= res_df["pvals"]) & (res_df["pvals"] <= 1))