diff --git a/src/multi_condition_comparisions/_util/__init__.py b/src/multi_condition_comparisions/_util/__init__.py new file mode 100644 index 0000000..a98b31a --- /dev/null +++ b/src/multi_condition_comparisions/_util/__init__.py @@ -0,0 +1,3 @@ +from .checks import check_is_integer_matrix, check_is_numeric_matrix + +__all__ = ["check_is_integer_matrix", "check_is_numeric_matrix"] diff --git a/src/multi_condition_comparisions/_util.py b/src/multi_condition_comparisions/_util/checks.py similarity index 100% rename from src/multi_condition_comparisions/_util.py rename to src/multi_condition_comparisions/_util/checks.py diff --git a/src/multi_condition_comparisions/_util/formulaic.py b/src/multi_condition_comparisions/_util/formulaic.py new file mode 100644 index 0000000..11a9d99 --- /dev/null +++ b/src/multi_condition_comparisions/_util/formulaic.py @@ -0,0 +1,203 @@ +"""Helpers to interact with Formulaic Formulas + +Some helpful definitions for working with formulaic formulas (e.g. `~ 0 + C(donor):treatment + np.log1p(continuous)`): + * A *term* refers to an expression in the formula, separated by `+`, e.g. `C(donor):treatment`, or `np.log1p(continuous)`. + * A *variable* refers to a column of the data frame passed to formulaic, e.g. `donor`. + * A *factor* is the specification of how a certain variable is represented in the design matrix, e.g. treatment coding with base level "A" and reduced rank. +""" + +from collections import defaultdict +from collections.abc import Mapping, Sequence +from dataclasses import dataclass +from typing import Any + +from formulaic import FactorValues, ModelSpec +from formulaic.materializers import PandasMaterializer +from formulaic.materializers.types import EvaluatedFactor +from formulaic.parser.types import Factor +from interface_meta import override + + +@dataclass +class FactorMetadata: + """Store (relevant) metadata for a factor of a formula.""" + + name: str + """The unambiguous factor name as specified in the formula. E.g. `donor`, or `C(donor, contr.treatment(base="A"))`""" + + reduced_rank: bool + """Whether a column will be dropped because it is redundant""" + + custom_encoder: bool + """Whether or not a custom encoder (e.g. `C(...)`) was used.""" + + categories: Sequence[str] + """The unique categories in this factor (after applying `drop_rows`)""" + + kind: Factor.Kind + """Type of the factor""" + + drop_field: str = None + """ + The category that is dropped. + + Note that + * this may also be populated if `reduced_rank = False` + * this is only populated when no encoder was used (e.g. `~ donor` but NOT `~ C(donor)`. + """ + + column_names: Sequence[str] = None + """ + The column names for this factor included in the design matrix. + + This may be the same as `categories` if the default encoder is used, or + categories without the base level if a custom encoder (e.g. `C(...)`) is used. + """ + + colname_format: str = None + """A formattable string that can be used to generate the column name in the design matrix, e.g. `{name}[T.{field}]`""" + + @property + def base(self) -> str | None: + """ + The base category for this categorical factor. + + This is derived from `drop_field` (for default encoding) or by comparing the column names in + the design matrix with all categories (for custom encoding, e.g. `C(...)`). + """ + if not self.reduced_rank: + return None + else: + if self.custom_encoder: + tmp_base = set(self.categories) - set(self.column_names) + assert len(tmp_base) == 1 + return tmp_base.pop() + else: + assert self.drop_field is not None + return self.drop_field + + +def get_factor_storage_and_materializer() -> tuple[dict[str, list[FactorMetadata]], dict[str, set[str]], type]: + """ + Keep track of categorical factors used in a model spec. + + Generates a custom materializers that reports back certain metadata upon materialization of the model matrix. + + Returns + ------- + factor_storage + A dictionary pointing to Metadata for each factor processed by the custom materializer + variable_to_factors + A dictionary mapping variables to factor names (similar to model_spec.variable_terms), except that it maps + to *factors* rather than *terms* + CustomPandasMaterializer + A materializer class that is tied to the particular instance of `factor_storage`. + """ + # There can be multiple FactorMetadata entries per sample, for instance when formulaic generates an interaction + # term, it generates the factor with both full rank and reduced rank. + factor_storage: dict[str, list[FactorMetadata]] = defaultdict(list) + variable_to_factors: dict[str, set[str]] = defaultdict(set) + + class CustomPandasMaterializer(PandasMaterializer): + """An extension of the PandasMaterializer that records all categorical variables and their (base) categories.""" + + REGISTER_NAME = "custom_pandas" + REGISTER_INPUTS = ("pandas.core.frame.DataFrame",) + REGISTER_OUTPUTS = ("pandas", "numpy", "sparse") + + def __init__( + self, + data: Any, + context: Mapping[str, Any] | None = None, + record_factor_metadata: bool = False, + **params: Any, + ): + """ + Initialize the Materializer + + Parameters + ---------- + data + passed to PandasMaterializer + context + passed to PandasMaterializer + record_factor_metadata + Flag that tells whether this particular instance of the custom materializer class + is supposed to record factor metadata. Only the instance that is used for building the design + matrix should record the metadata. All other instances (e.g. used to generate contrast vectors) + should not record metadata to not overwrite the specifications from the design matrix. + **params + passed to PandasMaterializer + """ + self.factor_metadata_storage = factor_storage if record_factor_metadata else None + self.variable_to_factors = variable_to_factors if record_factor_metadata else None + # temporary pointer to metadata of factor that is currently evaluated + self._current_factor: FactorMetadata = None + super().__init__(data, context, **params) + + @override + def _encode_evaled_factor( + self, factor: EvaluatedFactor, spec: ModelSpec, drop_rows: Sequence[int], reduced_rank: bool = False + ) -> dict[str, Any]: + """ + Function is called just before the factor is evaluated. + + We can record some metadata, before we call the original function. + """ + assert ( + self._current_factor is None + ), "_current_factor should always be None when we start recording metadata" + if self.factor_metadata_storage is not None: + # Don't store if the factor is cached (then we should already have recorded it) + if factor.expr in self.encoded_cache or (factor.expr, reduced_rank) in self.encoded_cache: + assert factor.expr in self.factor_metadata_storage, "Factor should be there since it's cached" + else: + for var in factor.variables: + self.variable_to_factors[var].add(factor.expr) + self._current_factor = FactorMetadata( + name=factor.expr, + reduced_rank=reduced_rank, + categories=tuple(sorted(factor.values.drop(index=factor.values.index[drop_rows]).unique())), + custom_encoder=factor.metadata.encoder is not None, + kind=factor.metadata.kind, + ) + return super()._encode_evaled_factor(factor, spec, drop_rows, reduced_rank) + + @override + def _flatten_encoded_evaled_factor(self, name: str, values: FactorValues[dict]) -> dict[str, Any]: + """ + Function is called at the end, before the design matrix gets materialized. + + Here we have access to additional metadata, such as `drop_field`. + """ + if self._current_factor is not None: + assert self._current_factor.name == name + self._current_factor.drop_field = values.__formulaic_metadata__.drop_field + self._current_factor.column_names = values.__formulaic_metadata__.column_names + self._current_factor.colname_format = values.__formulaic_metadata__.format + self.factor_metadata_storage[name].append(self._current_factor) + self._current_factor = None + + return super()._flatten_encoded_evaled_factor(name, values) + + return factor_storage, variable_to_factors, CustomPandasMaterializer + + +class AmbiguousAttributeError(ValueError): + pass + + +def resolve_ambiguous(objs: Sequence[Any], attr: str) -> Any: + """Given a list of objects, return an attribute if it is the same between all object. Otherwise raise an error.""" + if not objs: + raise ValueError("Collection is empty") + + first_obj_attr = getattr(objs[0], attr) + + # Check if the attribute is the same for all objects + for obj in objs[1:]: + if getattr(obj, attr) != first_obj_attr: + raise AmbiguousAttributeError(f"Ambiguous attribute '{attr}': values differ between objects") + + # If attribute is the same for all objects, return it + return first_obj_attr diff --git a/src/multi_condition_comparisions/methods/_base.py b/src/multi_condition_comparisions/methods/_base.py index b834207..a8b05ab 100644 --- a/src/multi_condition_comparisions/methods/_base.py +++ b/src/multi_condition_comparisions/methods/_base.py @@ -1,16 +1,22 @@ -import re from abc import ABC, abstractmethod from collections.abc import Mapping, Sequence from dataclasses import dataclass +from itertools import chain from types import MappingProxyType +from typing import Any import numpy as np import pandas as pd from anndata import AnnData -from formulaic import model_matrix -from formulaic.model_matrix import ModelMatrix from multi_condition_comparisions._util import check_is_numeric_matrix +from multi_condition_comparisions._util.formulaic import ( + AmbiguousAttributeError, + Factor, + FactorMetadata, + get_factor_storage_and_materializer, + resolve_ambiguous, +) @dataclass @@ -147,8 +153,17 @@ def __init__( super().__init__(adata, mask=mask, layer=layer) self._check_counts() + self.factor_storage: dict[str, list[FactorMetadata]] | None = None + """Object to store metadata of formulaic factors which is used for building contrasts later. If a design matrix + is passed directly, this remains None.""" + + self.variable_to_factors: dict[str, set[str]] | None = None + """Stores mapping from variables to formulaic factors""" + if isinstance(design, str): - self.design = model_matrix(design, adata.obs) + # Use custom formulaic materializer that will record factor metadata while creating the model matrix + self.factor_storage, self.variable_to_factors, materializer_class = get_factor_storage_and_materializer() + self.design = materializer_class(adata.obs, record_factor_metadata=True).get_model_matrix(design) else: self.design = design @@ -216,7 +231,7 @@ def compare_groups( return de_res @property - def variables(self): + def variables(self) -> set: """Get the names of the variables used in the model definition""" try: return self.design.model_spec.variables_by_source["data"] @@ -252,22 +267,25 @@ def fit(self, **kwargs) -> None: ... @abstractmethod - def _test_single_contrast(self, contrast, **kwargs) -> pd.DataFrame: ... + def _test_single_contrast(self, contrast: Sequence[float], **kwargs) -> pd.DataFrame: ... - def test_contrasts( - self, contrasts: list[str] | dict[str, np.ndarray] | dict[str, list] | np.ndarray, **kwargs - ) -> pd.DataFrame: + def test_contrasts(self, contrasts: Sequence[float] | dict[str, Sequence[float]], **kwargs) -> pd.DataFrame: """ - Conduct a specific test. + Perform a comparison as specified in a contrast vector. + + The contrast vector is a numeric vector with one element for each column in the design matrix. + We recommend building the contrast vector using {func}`~LinearModelBase.contrast` or + {func}`~LinearModelBase.cond`. - Please use :method:`contrast` to build the contrasts instead of building it on your own. + Multiple comparisons can be specified as a dictionary of contrast vectors. Parameters ---------- contrasts: - either a single contrast, or a dictionary of contrasts where the key is the name for that particular contrast. - Each contrast can be either a vector of coefficients (the most general case), a string, a DSL (work in progress) - or a tuple with three elements contrasts = ("condition", "control", "treatment") + Either a numeric contrast vector, or a dictionary of numeric contrast vectors. The dictionary keys + are added in a column named `contrast` in the result dataframe. + kwargs + are passed to the respective implementation """ if not isinstance(contrasts, dict): contrasts = {None: contrasts} @@ -300,57 +318,165 @@ def test_reduced(self, modelB: "MethodBase") -> pd.DataFrame: def cond(self, **kwargs) -> np.ndarray: """ - The intention is to make contrasts using this function as in glmGamPoi + Get a contrast vector representing a specific condition. + + The resulting contrast vector can be combined using arithmetic operations to generate a contrast + vector for a specific comparison, e.g. for a simple comparison treated vs. control + + >>> contrast = model.cond(condition="treated") - model.cond(condition="ctrl") + + When using an interaction term `cell * condition`, a comparison within a specific cell-type can be + easily built as follows: + + >>> contrast = model.cond(cell="B cells", condition="ctrl") - cond(cell="B cells", condition="treated") - >>> res < -test_de( - ... fit, contrast=cond(cell="B cells", condition="stim") - cond(cell="B cells", condition="ctrl") - ... ) + This way of building contrast vectors is inspired by [glmGamPoi](https://bioconductor.org/packages/release/bioc/html/glmGamPoi.html). Parameters ---------- **kwargs + column/value pairs + Returns + ------- + A contrast vector that aligns to the columns of the design matrix. """ - - # TODO this is hacky - reach out to formulaic authors how to do this properly - def _get_var_from_colname(colname): - regex = re.compile(r"^.+\[T\.(.+)\]$") - return regex.search(colname).groups()[0] - - if not isinstance(self.design, ModelMatrix): + if self.factor_storage is None: raise RuntimeError( "Building contrasts with `cond` only works if you specified the model using a " "formulaic formula. Please manually provide a contrast vector." ) + cond_dict = kwargs + + if not set(cond_dict.keys()).issubset(self.variables): + raise ValueError( + "You specified a variable that is not part of the model. Available variables: " + + ",".join(self.variables) + ) + + # We now fill `cond_dict` such that it is equivalent to a data row from `adata.obs`. + # This data can then be passed to the `get_model_matrix` of formulaic to retrieve a corresponding + # contrast vector. + # To do so, we keep the values that were already specified, and fill all other values with the default category + # (the one that is usually dropped from the model for being redundant). + # + # self.variables only contains "data-variables", but no "factor variables", such as `C` for var in self.variables: - var_type = self.design.model_spec.encoder_state[var][0].value - if var_type == "categorical": - all_categories = set(self.design.model_spec.encoder_state[var][1]["categories"]) - if var in kwargs: - if var_type == "categorical" and kwargs[var] not in all_categories: - raise ValueError( - f"You specified a non-existant category for {var}. Possible categories: {', '.join(all_categories)}" - ) - else: - # fill with default values - if var_type != "categorical": - cond_dict[var] = 0 - else: - var_cols = self.design.columns[self.design.columns.str.startswith(f"{var}[")] + # If the variable is specified in the cond_dict explicitly, we just keep it as is. + # We still verify that it's a valid category, otherwise simple typos are not caught and lead to + # zeros in the design matrix. + if var in cond_dict: + self._check_category(var, cond_dict[var]) - present_categories = {_get_var_from_colname(x) for x in var_cols} - dropped_category = all_categories - present_categories - assert len(dropped_category) == 1 - cond_dict[var] = next(iter(dropped_category)) + # If the variable is not specified, we want to fill it with its default value (i.e. the base category) + else: + cond_dict[var] = self._get_default_value(var) df = pd.DataFrame([kwargs]) + return self.design.model_spec.get_model_matrix(df).iloc[0] + + def _get_factor_metadata_for_variable(self, var) -> list[FactorMetadata]: + """ + Get the Metadata objects of all factors defined by a given variable. + + A variable can refer to one or multiple factors. Either if a variable is specified + multiple times in the model (e.g. ~ var + C(var); ~ continuous + np.log(continuous)) + or when there's an interaction term (e.g. ~ A * B ==> terms `A`, `B`, `A:B`). + + Some Factors can have multiple metadata objects, because they are specified + multiple times in the formula, or forumlaic resolves them multiple times internally. + + Even if there are multiple factors per variable, they could still contain the same metadata and we + can unambiguously retreive metadata for a given variable, e.g. using the `_util.formulaic.resolve_ambiguous` + function. + + Returns + ------- + a list of FactorMetadata objects + """ + factors = self.variable_to_factors[var] + + return list(chain.from_iterable(self.factor_storage[f] for f in factors)) + + def _get_default_value(self, var) -> Any: + r""" + Get the default value (base category) for variable `var`. - return self.design.model_spec.get_model_matrix(df) + Special cases: + * If the variable is continuous, return 0. + * For a variable without reduced rank (as it happens for the first variable in a no-intercept model `~0 + xxx`) + the function returns the null string "\0". This is to ensure that when passed to `formulaic.get_model_matrix`, + a row is returned that sets all columns of that variable to 0. It could be any string that is not + a valid category. It cannot be `None`, because this results in failure of `get_model_matrix`, therefore the + null string was chosen because of clear semantics and it being surely not a valid category. - def contrast(self, column: str, baseline: str, group_to_compare: str) -> list: - """Build a simple contrast for pairwise comparisons. + Raises + ------ + ValueError + If there is no way to unambiguously infer the base category from the formulaic formula. + + Returns + ------- + The default value of the given variable. + """ + factor_metadata = self._get_factor_metadata_for_variable(var) + if resolve_ambiguous(factor_metadata, "kind") == Factor.Kind.CATEGORICAL: + # In this case it can be ambiguous for some formulas -> Tell the user to specify the variable explicitly + try: + tmp_base = resolve_ambiguous(factor_metadata, "base") + except AmbiguousAttributeError as e: + raise ValueError( + f"Could not automatically resolve base category for variable {var}. Please specify it explicity in `model.cond`." + ) from e + + # if tmp_base is None (no-intercept model), set it to the NUL string \0. + # In principle, it could be any string that is not a valid category, but it cannot be None + # because this leads to an error in `get_model_matrix` + return tmp_base if tmp_base is not None else "\0" + else: + # Set to zero for continuous variables + return 0 + + def _check_category(self, var, value) -> None: + """ + Check if `value` is a valid category of the factor defined by `var`. + + If `var` is a continuous variable this passes silently. + + Raises + ------ + ValueError + If `value` is not a valid category. + """ + factor_metadata = self._get_factor_metadata_for_variable(var) + + # Getting the categories should never be non-ambiguous. If it happens, it's an edge case we don't know about (-> let it fail) + tmp_categories = resolve_ambiguous(factor_metadata, "categories") + if resolve_ambiguous(factor_metadata, "kind") == Factor.Kind.CATEGORICAL and value not in tmp_categories: + raise ValueError( + f"You specified a non-existant category for {var}. Possible categories: {', '.join(tmp_categories)}" + ) + + def contrast(self, column: str, baseline: str, group_to_compare: str) -> pd.Series: + """ + Build a simple contrast for pairwise comparisons. + + This is an alias for + + >>> model.cond(column=group_to_compare) - model.cond(column=baseline) - In the future all methods should be able to accept the output of :method:`StatsmodelsDE.contrast` but alas a big TODO. + Parameters + ---------- + column + column in adata.obs to test on + baseline + baseline category (denominator) + group_to_compare + category to compare against baseline (nominator) + + Returns + ------- + Numeric contrast vector """ - return [column, baseline, group_to_compare] + return self.cond(**{column: group_to_compare}) - self.cond(**{column: baseline}) diff --git a/src/multi_condition_comparisions/methods/_edger.py b/src/multi_condition_comparisions/methods/_edger.py index dd328bb..a7691c5 100644 --- a/src/multi_condition_comparisions/methods/_edger.py +++ b/src/multi_condition_comparisions/methods/_edger.py @@ -1,3 +1,5 @@ +from collections.abc import Sequence + import numpy as np import pandas as pd from scanpy import logging @@ -48,11 +50,11 @@ def fit(self, **kwargs): # adata, design, mask, layer try: edger = importr("edgeR") - except ImportError: + except ImportError as e: raise ImportError( "edgeR requires a valid R installation with the following packages:\n" "edgeR, BiocParallel, RhpcBLASctl" - ) from None + ) from e # Convert dataframe with localconverter(ro.default_converter + numpy2ri.converter): @@ -81,7 +83,7 @@ def fit(self, **kwargs): # adata, design, mask, layer ro.globalenv["fit"] = fit self.fit = fit - def _test_single_contrast(self, contrast: list[str], **kwargs) -> pd.DataFrame: + def _test_single_contrast(self, contrast: Sequence[float], **kwargs) -> pd.DataFrame: """ Conduct test for each contrast and return a data frame @@ -119,12 +121,7 @@ def _test_single_contrast(self, contrast: list[str], **kwargs) -> pd.DataFrame: ) from None # Convert vector to R, which drops a category like `self.design_matrix` to use the intercept for the left out. - contrast_vec = [0] * len(self.design.columns) - make_contrast_column_key = lambda ind: f"{contrast[0]}[T.{contrast[ind]}]" - for index in [1, 2]: - if make_contrast_column_key(index) in self.design.columns: - contrast_vec[self.design.columns.to_list().index(f"{contrast[0]}[T.{contrast[index]}]")] = 1 - contrast_vec_r = ro.conversion.py2rpy(np.asarray(contrast_vec)) + contrast_vec_r = ro.conversion.py2rpy(np.asarray(contrast)) ro.globalenv["contrast_vec"] = contrast_vec_r # Test contrast with R diff --git a/src/multi_condition_comparisions/methods/_pydeseq2.py b/src/multi_condition_comparisions/methods/_pydeseq2.py index f46d775..5cdc666 100644 --- a/src/multi_condition_comparisions/methods/_pydeseq2.py +++ b/src/multi_condition_comparisions/methods/_pydeseq2.py @@ -63,18 +63,27 @@ def fit(self, **kwargs) -> pd.DataFrame: dds.deseq2() self.dds = dds - def _test_single_contrast(self, contrast: list[str], alpha=0.05, **kwargs) -> pd.DataFrame: + # TODO: PyDeseq2 doesn't support arbitrary designs and contrasts yet + # see https://github.com/owkin/PyDESeq2/issues/213 + # + # Therefore these functions are overridden in a way to make it work with PyDESeq2, + # ingoring the inconsistency of function signatures. Once arbitrary design + # matrices and contrasts are supported by PyDEseq2, we can fully support the + # Linear model interface. + def _test_single_contrast(self, contrast: list[str], alpha=0.05, **kwargs) -> pd.DataFrame: # type: ignore """ Conduct a specific test and returns a Pandas DataFrame. + Note that PyDESeq2 doesn't support arbitrary contrasts yet. + Parameters ---------- - contrasts: - list of three strings of the form - ["variable", "tested level", "reference level"] - alpha: p value threshold used for controlling fdr with - independent hypothesis weighting - kwargs: extra arguments to pass to DeseqStats() + contrasts + list of three strings of the form `["variable", "tested level", "reference level"]`. + alpha + p value threshold used for controlling fdr with independent hypothesis weighting + kwargs + extra arguments to pass to DeseqStats() """ stat_res = DeseqStats(self.dds, contrast=contrast, alpha=alpha, **kwargs) # Calling `.summary()` is required to fill the `results_df` data frame @@ -87,3 +96,11 @@ def _test_single_contrast(self, contrast: list[str], alpha=0.05, **kwargs) -> pd res_df.index.name = "variable" res_df = res_df.reset_index() return res_df + + def cond(self, **kwargs) -> ndarray: + raise NotImplementedError( + "PyDESeq2 currently doesn't support arbitratry contrasts, see https://github.com/owkin/PyDESeq2/issues/213" + ) + + def contrast(self, column: str, baseline: str, group_to_compare: str) -> tuple[str, str, str]: # type: ignore + return (column, group_to_compare, baseline) diff --git a/src/multi_condition_comparisions/methods/_statsmodels.py b/src/multi_condition_comparisions/methods/_statsmodels.py index 1bc4ef5..1fda634 100644 --- a/src/multi_condition_comparisions/methods/_statsmodels.py +++ b/src/multi_condition_comparisions/methods/_statsmodels.py @@ -18,7 +18,7 @@ def _check_counts(self): def fit( self, - regression_model: sm.OLS | sm.GLM = sm.OLS, + regression_model: type[sm.OLS] | type[sm.GLM] = sm.OLS, **kwargs, ) -> None: """ diff --git a/tests/test_base.py b/tests/test_base.py new file mode 100644 index 0000000..cf825d5 --- /dev/null +++ b/tests/test_base.py @@ -0,0 +1,87 @@ +from collections.abc import Sequence + +import pytest +from pandas.core.api import DataFrame as DataFrame + +from multi_condition_comparisions.methods import LinearModelBase + + +@pytest.fixture +def MockLinearModel(): + class _MockLinearModel(LinearModelBase): + def _check_counts(self) -> None: + pass + + def fit(self, **kwargs) -> None: + pass + + def _test_single_contrast(self, contrast: Sequence[float], **kwargs) -> DataFrame: + pass + + return _MockLinearModel + + +@pytest.mark.parametrize( + "formula,cond_kwargs,expected_contrast", + [ + # single variable + ["~ condition", {}, [1, 0]], + ["~ condition", {"condition": "A"}, [1, 0]], + ["~ condition", {"condition": "B"}, [1, 1]], + ["~ condition", {"condition": "42"}, ValueError], # non-existant category + # no-intercept models + ["~ 0 + condition", {"condition": "A"}, [1, 0]], + ["~ 0 + condition", {"condition": "B"}, [0, 1]], + # Different way of specifying dummy coding + ["~ donor", {"donor": "D0"}, [1, 0, 0, 0]], + ["~ C(donor)", {"donor": "D0"}, [1, 0, 0, 0]], + ["~ C(donor, contr.treatment(base='D2'))", {"donor": "D2"}, [1, 0, 0, 0]], + ["~ C(donor, contr.treatment(base='D2'))", {"donor": "D0"}, [1, 1, 0, 0]], + # Handle continuous covariates + ["~ donor + continuous", {"donor": "D1"}, [1, 1, 0, 0, 0]], + ["~ donor + np.log1p(continuous)", {"donor": "D1"}, [1, 1, 0, 0, 0]], + ["~ donor + continuous + np.log1p(continuous)", {"donor": "D0"}, [1, 0, 0, 0, 0, 0]], + # Nonsense models repeating the same variable, which are nonetheless allowed by formulaic + ["~ donor + C(donor)", {"donor": "D1"}, [1, 1, 0, 0, 1, 0, 0]], + ["~ donor + C(donor, contr.treatment(base='D2'))", {"donor": "D0"}, [1, 0, 0, 0, 1, 0, 0]], + [ + "~ condition + donor + C(donor, contr.treatment(base='D2'))", + {"condition": "A"}, + ValueError, + ], # donor base category can't be resolved because it's ambiguous -> ValueError + # Sum2zero coding + ["~ C(donor, contr.sum)", {"donor": "D0"}, [1, 1, 0, 0]], + ["~ C(donor, contr.sum)", {"donor": "D3"}, [1, -1, -1, -1]], + # Multiple categorical variables + ["~ condition + donor", {"condition": "A"}, [1, 0, 0, 0, 0]], + ["~ condition + donor", {"donor": "D2"}, [1, 0, 0, 1, 0]], + ["~ condition + donor", {"condition": "B", "donor": "D2"}, [1, 1, 0, 1, 0]], + ["~ 0 + condition + donor", {"donor": "D1"}, [0, 0, 1, 0, 0]], + # Interaction terms + ["~ condition * donor", {"condition": "A"}, [1, 0, 0, 0, 0, 0, 0, 0]], + ["~ condition + donor + condition:donor", {"condition": "A"}, [1, 0, 0, 0, 0, 0, 0, 0]], + ["~ condition * donor", {"condition": "B", "donor": "D2"}, [1, 1, 0, 1, 0, 0, 1, 0]], + ["~ condition * C(donor, contr.treatment(base='D2'))", {"condition": "A"}, [1, 0, 0, 0, 0, 0, 0, 0]], + [ + "~ condition * C(donor, contr.treatment(base='D2'))", + {"condition": "B", "donor": "D0"}, + [1, 1, 1, 0, 0, 1, 0, 0], + ], + [ + "~ condition:donor", + {"condition": "A"}, + ValueError, + ], # Can't automatically resolve base category, because Formulaic builds a reduced-rank and full-rank factor internally + ["~ condition:donor", {"condition": "A", "donor": "D1"}, [1, 1, 0, 0, 0, 0, 0, 0]], + ["~ condition:C(donor)", {"condition": "A", "donor": "D1"}, [1, 1, 0, 0, 0, 0, 0, 0]], + ], +) +def test_model_cond(test_adata_minimal, MockLinearModel, formula, cond_kwargs, expected_contrast): + mod = MockLinearModel(test_adata_minimal, formula) + if isinstance(expected_contrast, type): + with pytest.raises(expected_contrast): + mod.cond(**cond_kwargs) + else: + actual_contrast = mod.cond(**cond_kwargs) + assert actual_contrast.tolist() == expected_contrast + assert actual_contrast.index.tolist() == mod.design.columns.tolist() diff --git a/tests/test_edger.py b/tests/test_edger.py index 521403b..b55fb36 100644 --- a/tests/test_edger.py +++ b/tests/test_edger.py @@ -12,7 +12,7 @@ def test_edger_simple(test_adata): """ method = EdgeR(adata=test_adata, design="~condition") method.fit() - res_df = method.test_contrasts(["condition", "A", "B"]) + res_df = method.test_contrasts(method.contrast("condition", "A", "B")) assert len(res_df) == test_adata.n_vars @@ -24,7 +24,7 @@ def test_edger_complex(test_adata): test_adata.obs["condition1"] = test_adata.obs["condition"].copy() method = EdgeR(adata=test_adata, design="~condition1+group") method.fit() - res_df = method.test_contrasts(["condition1", "A", "B"]) + res_df = method.test_contrasts(method.contrast("condition1", "A", "B")) assert len(res_df) == test_adata.n_vars # Check that the index of the result matches the var_names of the AnnData object diff --git a/tests/test_formulaic.py b/tests/test_formulaic.py new file mode 100644 index 0000000..da64be5 --- /dev/null +++ b/tests/test_formulaic.py @@ -0,0 +1,209 @@ +import pandas as pd +import pytest +from formulaic.parser.types import Factor + +from multi_condition_comparisions._util.formulaic import ( + AmbiguousAttributeError, + FactorMetadata, + get_factor_storage_and_materializer, + resolve_ambiguous, +) + + +@pytest.mark.parametrize( + "formula,reorder_categorical,expected_factor_metadata", + [ + [ + "~ donor", + None, + {"donor": {"reduced_rank": True, "custom_encoder": False, "base": "D0"}}, + ], + [ + "~ donor", + {"donor": ["D2", "D1", "D0", "D3"]}, + {"donor": {"reduced_rank": True, "custom_encoder": False, "base": "D2"}}, + ], + [ + "~ C(donor)", + None, + {"C(donor)": {"reduced_rank": True, "custom_encoder": True, "base": "D0"}}, + ], + [ + "~ C(donor, contr.treatment(base='D2'))", + None, + {"C(donor, contr.treatment(base='D2'))": {"reduced_rank": True, "custom_encoder": True, "base": "D2"}}, + ], + [ + "~ C(donor, contr.sum)", + None, + {"C(donor, contr.sum)": {"reduced_rank": True, "custom_encoder": True, "base": "D3"}}, + ], + [ + "~ C(donor, contr.sum)", + {"donor": ["D1", "D0", "D3", "D2"]}, + {"C(donor, contr.sum)": {"reduced_rank": True, "custom_encoder": True, "base": "D2"}}, + ], + [ + "~ condition", + None, + {"condition": {"reduced_rank": True, "custom_encoder": False, "base": "A"}}, + ], + [ + "~ C(condition)", + None, + {"C(condition)": {"reduced_rank": True, "custom_encoder": True, "base": "A"}}, + ], + [ + "~ C(condition, contr.treatment(base='B'))", + None, + {"C(condition, contr.treatment(base='B'))": {"reduced_rank": True, "custom_encoder": True, "base": "B"}}, + ], + [ + "~ C(condition, contr.sum)", + None, + {"C(condition, contr.sum)": {"reduced_rank": True, "custom_encoder": True, "base": "B"}}, + ], + [ + "~ 0 + condition", + None, + {"condition": {"reduced_rank": False, "custom_encoder": False, "base": None}}, + ], + [ + "~ condition + donor", + None, + { + "condition": {"reduced_rank": True, "custom_encoder": False, "base": "A"}, + "donor": {"reduced_rank": True, "custom_encoder": False, "base": "D0"}, + }, + ], + [ + "~ 0 + condition + donor", + None, + { + "condition": {"reduced_rank": False, "custom_encoder": False, "base": None}, + "donor": {"reduced_rank": True, "custom_encoder": False, "base": "D0"}, + }, + ], + [ + "~ condition * donor", + None, + { + "condition": {"reduced_rank": True, "custom_encoder": False, "base": "A"}, + "donor": {"reduced_rank": True, "custom_encoder": False, "base": "D0"}, + }, + ], + [ + "~ condition * C(donor, contr.treatment(base='D2'))", + None, + { + "condition": {"reduced_rank": True, "custom_encoder": False, "base": "A"}, + "C(donor, contr.treatment(base='D2'))": {"reduced_rank": True, "custom_encoder": True, "base": "D2"}, + }, + ], + [ + "~ condition + C(condition) + C(condition, contr.treatment(base='B'))", + None, + { + "condition": {"reduced_rank": True, "custom_encoder": False, "base": "A"}, + "C(condition)": {"reduced_rank": True, "custom_encoder": True, "base": "A"}, + "C(condition, contr.treatment(base='B'))": {"reduced_rank": True, "custom_encoder": True, "base": "B"}, + }, + ], + [ + "~ condition + continuous + np.log(continuous)", + None, + { + "condition": { + "reduced_rank": True, + "custom_encoder": False, + "base": "A", + "kind": Factor.Kind.CATEGORICAL, + }, + "continuous": { + "reduced_rank": False, + "custom_encoder": False, + "base": None, + "kind": Factor.Kind.NUMERICAL, + }, + "np.log(continuous)": { + "reduced_rank": False, + "custom_encoder": False, + "base": None, + "kind": Factor.Kind.NUMERICAL, + }, + }, + ], + [ + "~ condition * donor + continuous", + None, + { + "condition": {"reduced_rank": True, "custom_encoder": False, "base": "A"}, + "donor": {"reduced_rank": True, "custom_encoder": False, "base": "D0"}, + "continuous": { + "reduced_rank": False, + "custom_encoder": False, + "base": None, + "kind": Factor.Kind.NUMERICAL, + }, + }, + ], + [ + "~ condition:donor", + None, + { + "condition": {"reduced_rank": True, "custom_encoder": False, "base": "A"}, + "donor": { + "custom_encoder": False, + "drop_field": "D0", + }, # `reduced_rank` and `base` will be ambigous here because Formulaic generates both version of the factor internally + }, + ], + ], +) +def test_custom_materializer(test_adata_minimal, formula, reorder_categorical, expected_factor_metadata): + """Test that the custom materializer correctly stores the baseline category. + + Parameters + ---------- + test_adata_minimal + adata fixture + formula + Formula to test + reorder_categorical + Create a pandas categorical for a given column with a certain order of categories + expected_factor_metadata + dict with expected values for each factor + """ + if reorder_categorical is not None: + for col, order in reorder_categorical.items(): + test_adata_minimal.obs[col] = pd.Categorical(test_adata_minimal.obs[col], categories=order) + factor_storage, _, materializer = get_factor_storage_and_materializer() + materializer(test_adata_minimal.obs, record_factor_metadata=True).get_model_matrix(formula) + for factor, expected_metadata in expected_factor_metadata.items(): + actual_metadata = factor_storage[factor] + for k in expected_metadata: + assert resolve_ambiguous(actual_metadata, k) == expected_metadata[k] + + +def test_resolve_ambiguous(): + obj1 = FactorMetadata("F1", True, True, ["A", "B"], Factor.Kind.CATEGORICAL) + obj2 = FactorMetadata("F2", True, False, ["A", "B"], Factor.Kind.CATEGORICAL) + obj3 = FactorMetadata("F3", True, False, None, Factor.Kind.NUMERICAL) + + with pytest.raises(ValueError): + resolve_ambiguous([], "foo") + + with pytest.raises(AttributeError): + resolve_ambiguous([obj1, obj2], "doesntexist") + + with pytest.raises(AmbiguousAttributeError): + assert resolve_ambiguous([obj1, obj2], "name") + + assert resolve_ambiguous([obj1, obj2, obj3], "reduced_rank") is True + assert resolve_ambiguous([obj1, obj2], "categories") == ["A", "B"] + + with pytest.raises(AmbiguousAttributeError): + assert resolve_ambiguous([obj1, obj2, obj3], "categories") + + with pytest.raises(AmbiguousAttributeError): + assert resolve_ambiguous([obj1, obj3], "kind")