Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functionality to combine aggregated site files #70

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions 0.preprocess-sites/scripts/site_processing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ def load_features(core, example_dir):
feature_df = feature_df.query("feature_name not in @meta_df.feature_name")
feature_df = pd.concat([meta_df, feature_df]).reset_index(drop=True)

# Add dtype column specification
col_dtype = [
"str" if x.startswith("Metadata") else "float" for x in feature_df.feature_name
]
feature_df = feature_df.assign(col_dtype=col_dtype)

return feature_df


Expand Down
6 changes: 4 additions & 2 deletions 1.generate-profiles/0.merge-single-cells.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,11 @@
["Metadata_Foci_site"] + all_feature_df.feature_name.tolist(), axis="columns"
)

# Merge single cell profiles and metadata
# Merge single cell profiles and metadata info containing cell assignments
# A left merge ensures that we retain only cells that pass the quality threshold
common_cols = list(set(metadata_df.columns).intersection(set(sc_merged_df.columns)))
sc_merged_df = metadata_df.merge(
sc_merged_df, on=merge_info["metadata_linking_columns"], how="left"
sc_merged_df, on=common_cols, how="left"
).reset_index(drop=True)

if single_file_only:
Expand Down
98 changes: 65 additions & 33 deletions 1.generate-profiles/1.aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
import argparse
import warnings
import pandas as pd
import dask.dataframe as dd

from pycytominer import aggregate
from pycytominer.cyto_utils import output

sys.path.append("recipe")
from scripts.profile_utils import aggregate_pooled, approx_aggregate_piecewise

sys.path.append("config")
from utils import parse_command_args, process_configuration

Expand All @@ -24,7 +28,6 @@
experiment_config=experiment_config_file,
)


# Extract config arguments
perform = config["options"]["profile"]["aggregate"]["perform"]

Expand All @@ -41,10 +44,12 @@

single_cell_file = config["files"]["single_file_only_output_file"]
single_cell_site_files = config["files"]["single_cell_site_files"]
prefilter_file = config["files"]["prefilter_file"]
aggregate_output_files = config["files"]["aggregate_files"]

sc_config = config["options"]["profile"]["single_cell"]
aggregate_from_single_file = sc_config["output_one_single_cell_file_only"]
prefilter_features = sc_config["prefilter_features"]

aggregate_args = config["options"]["profile"]["aggregate"]
aggregate_operation = aggregate_args["operation"]
Expand All @@ -59,45 +64,72 @@
single_cell_file.exists()
), "Error! The single cell file does not exist! Check 0.merge-single-cells.py"

# Load preselected features
all_feature_df = pd.read_csv(prefilter_file, sep="\t")

if prefilter_features:
all_feature_df = all_feature_df.query("not prefilter_column")

# Load single cell data
aggregate_output_dir.mkdir(parents=True, exist_ok=True)
if aggregate_from_single_file:
print(f"Loading one single cell file: {single_cell_file}")
single_cell_df = pd.read_csv(single_cell_file, sep=",")

# Perform the aggregation based on the defined levels and columns
for aggregate_level, aggregate_columns in aggregate_levels.items():
aggregate_output_file = aggregate_output_files[aggregate_level]

print(
f"Now aggregating by {aggregate_level}...with operation: {aggregate_operation}"
)

aggregate(
population_df=single_cell_df,
strata=aggregate_columns,
features=aggregate_features,
operation=aggregate_operation,
output_file=aggregate_output_file,
compression_options=compression,
float_format=float_format,
)

else:
sites = list(single_cell_site_files)
print(f"Now loading data from {len(sites)} sites")
single_cell_df = []
for site in sites:
site_file = single_cell_site_files[site]
if site_file.exists():
site_df = pd.read_csv(site_file, sep=",")
single_cell_df.append(site_df)
else:
warnings.warn(
f"{site_file} does not exist. There must have been an error in processing"
)

single_cell_df = pd.concat(single_cell_df, axis="rows").reset_index(drop=True)

# Perform the aggregation based on the defined levels and columns
aggregate_output_dir.mkdir(parents=True, exist_ok=True)
for aggregate_level, aggregate_columns in aggregate_levels.items():
aggregate_output_file = aggregate_output_files[aggregate_level]

print(
f"Now aggregating by {aggregate_level}...with operation: {aggregate_operation}"
# Process the feature dtype dictionary for dask loading
cp_feature_df = all_feature_df.query("col_dtype == 'float'")
dtype_dict = dict(zip(cp_feature_df.feature_name, cp_feature_df.col_dtype))
cp_features = cp_feature_df.feature_name.tolist()

# Initialize dask loading
sc_df = dd.read_csv(
list(single_cell_site_files.values()),
blocksize=None,
assume_missing=True,
dtype=dtype_dict,
)

aggregate_df = aggregate(
population_df=single_cell_df,
strata=aggregate_columns,
features=aggregate_features,
operation=aggregate_operation,
)

output(
aggregate_df,
output_filename=aggregate_output_file,
compression=compression,
float_format=float_format,
)
# Perform the aggregation based on the defined levels and columns
for aggregate_level, aggregate_columns in aggregate_levels.items():
aggregate_output_file = aggregate_output_files[aggregate_level]

print(f"Now aggregating by {aggregate_level}...with dask operation mean")

# Dask currently only supports mean
agg_df = (
sc_df.loc[:, aggregate_columns + cp_features]
.groupby(aggregate_columns)
.mean()
.compute()
.reset_index()
)

# Output to file
output(
agg_df,
output_filename=aggregate_output_file,
compression_options=compression,
float_format=float_format,
)
2 changes: 1 addition & 1 deletion 1.generate-profiles/2.normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@
samples=normalize_by_samples,
method=normalize_method,
output_file=output_file,
compression=compression,
compression_options=compression,
float_format=float_format,
)
2 changes: 1 addition & 1 deletion 1.generate-profiles/3.feature-select.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,6 @@
na_cutoff=feature_select_nacutoff,
corr_threshold=feature_select_corr_threshold,
output_file=output_file,
compression=compression,
compression_options=compression,
float_format=float_format,
)
94 changes: 0 additions & 94 deletions 1.generate-profiles/profiling_config.yaml

This file was deleted.

62 changes: 62 additions & 0 deletions scripts/profile_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import pandas as pd
from pycytominer import aggregate
from pycytominer.cyto_utils import infer_cp_features


def sanitize_gene_col(metadata_df, gene_col, control_barcodes):
Expand All @@ -18,3 +20,63 @@ def sanitize_gene_col(metadata_df, gene_col, control_barcodes):

metadata_df.loc[:, gene_col] = genes
return metadata_df.copy()


def aggregate_pooled(site_df, strata, features, operation):
"""
Aggregate each pooled cell painting site independently

The arguments are the same as provided to pycytominer.aggregate()

Returns:
A dataframe of aggregated site-level profiles with perturbation count
"""

site_counts_df = (
site_df.loc[:, strata]
.value_counts()
.reset_index()
.rename(columns={0: "Metadata_pert_count"})
)

site_df = aggregate(site_df, strata=strata, features=features, operation=operation)

site_df = site_counts_df.merge(site_df, left_on=strata, right_on=strata)

return site_df


def get_prop(df_group, pert_col="Metadata_pert_count"):
"""
Get the proportion of total single cells that specific site contributed.
Applied to a pandas.DataFrame.groupby() on the aggregated column level
"""
pert_count = df_group[pert_col]
prop_result = pd.DataFrame(pert_count / pert_count.sum()).rename(
columns={pert_col: "Metadata_pert_prop"}
)
return prop_result.merge(df_group, left_index=True, right_index=True)


def approx_aggregate_piecewise(df, agg_cols, pert_col="Metadata_pert_count"):
"""
Given previously aggregated files concatenated with a column indicating count,
further aggregate.
"""
meta_features = infer_cp_features(df, metadata=True)
cp_features = df.drop(meta_features, axis="columns").columns.tolist()

agg_df = (
df.groupby(agg_cols)
.apply(lambda x: get_prop(x, pert_col=pert_col))
.reset_index(drop=True)
)

agg_df = agg_df.loc[:, meta_features].merge(
agg_df.loc[:, cp_features].multiply(agg_df.Metadata_pert_prop, axis="rows"),
left_index=True,
right_index=True,
)

agg_df = agg_df.groupby(agg_cols).sum().reset_index()
return agg_df