diff --git a/0.preprocess-sites/scripts/site_processing_utils.py b/0.preprocess-sites/scripts/site_processing_utils.py index f512c89..f802679 100644 --- a/0.preprocess-sites/scripts/site_processing_utils.py +++ b/0.preprocess-sites/scripts/site_processing_utils.py @@ -67,6 +67,12 @@ def load_features(core, example_dir): feature_df = feature_df.query("feature_name not in @meta_df.feature_name") feature_df = pd.concat([meta_df, feature_df]).reset_index(drop=True) + # Add dtype column specification + col_dtype = [ + "str" if x.startswith("Metadata") else "float" for x in feature_df.feature_name + ] + feature_df = feature_df.assign(col_dtype=col_dtype) + return feature_df diff --git a/1.generate-profiles/0.merge-single-cells.py b/1.generate-profiles/0.merge-single-cells.py index 36de3a7..fa5f410 100644 --- a/1.generate-profiles/0.merge-single-cells.py +++ b/1.generate-profiles/0.merge-single-cells.py @@ -150,9 +150,11 @@ ["Metadata_Foci_site"] + all_feature_df.feature_name.tolist(), axis="columns" ) - # Merge single cell profiles and metadata + # Merge single cell profiles and metadata info containing cell assignments + # A left merge ensures that we retain only cells that pass the quality threshold + common_cols = list(set(metadata_df.columns).intersection(set(sc_merged_df.columns))) sc_merged_df = metadata_df.merge( - sc_merged_df, on=merge_info["metadata_linking_columns"], how="left" + sc_merged_df, on=common_cols, how="left" ).reset_index(drop=True) if single_file_only: diff --git a/1.generate-profiles/1.aggregate.py b/1.generate-profiles/1.aggregate.py index dfdd238..6e92eab 100644 --- a/1.generate-profiles/1.aggregate.py +++ b/1.generate-profiles/1.aggregate.py @@ -4,10 +4,14 @@ import argparse import warnings import pandas as pd +import dask.dataframe as dd from pycytominer import aggregate from pycytominer.cyto_utils import output +sys.path.append("recipe") +from scripts.profile_utils import aggregate_pooled, approx_aggregate_piecewise + sys.path.append("config") from utils import parse_command_args, process_configuration @@ -24,7 +28,6 @@ experiment_config=experiment_config_file, ) - # Extract config arguments perform = config["options"]["profile"]["aggregate"]["perform"] @@ -41,10 +44,12 @@ single_cell_file = config["files"]["single_file_only_output_file"] single_cell_site_files = config["files"]["single_cell_site_files"] +prefilter_file = config["files"]["prefilter_file"] aggregate_output_files = config["files"]["aggregate_files"] sc_config = config["options"]["profile"]["single_cell"] aggregate_from_single_file = sc_config["output_one_single_cell_file_only"] +prefilter_features = sc_config["prefilter_features"] aggregate_args = config["options"]["profile"]["aggregate"] aggregate_operation = aggregate_args["operation"] @@ -59,45 +64,72 @@ single_cell_file.exists() ), "Error! The single cell file does not exist! Check 0.merge-single-cells.py" +# Load preselected features +all_feature_df = pd.read_csv(prefilter_file, sep="\t") + +if prefilter_features: + all_feature_df = all_feature_df.query("not prefilter_column") + # Load single cell data +aggregate_output_dir.mkdir(parents=True, exist_ok=True) if aggregate_from_single_file: print(f"Loading one single cell file: {single_cell_file}") single_cell_df = pd.read_csv(single_cell_file, sep=",") + + # Perform the aggregation based on the defined levels and columns + for aggregate_level, aggregate_columns in aggregate_levels.items(): + aggregate_output_file = aggregate_output_files[aggregate_level] + + print( + f"Now aggregating by {aggregate_level}...with operation: {aggregate_operation}" + ) + + aggregate( + population_df=single_cell_df, + strata=aggregate_columns, + features=aggregate_features, + operation=aggregate_operation, + output_file=aggregate_output_file, + compression_options=compression, + float_format=float_format, + ) + else: sites = list(single_cell_site_files) print(f"Now loading data from {len(sites)} sites") - single_cell_df = [] - for site in sites: - site_file = single_cell_site_files[site] - if site_file.exists(): - site_df = pd.read_csv(site_file, sep=",") - single_cell_df.append(site_df) - else: - warnings.warn( - f"{site_file} does not exist. There must have been an error in processing" - ) - - single_cell_df = pd.concat(single_cell_df, axis="rows").reset_index(drop=True) - -# Perform the aggregation based on the defined levels and columns -aggregate_output_dir.mkdir(parents=True, exist_ok=True) -for aggregate_level, aggregate_columns in aggregate_levels.items(): - aggregate_output_file = aggregate_output_files[aggregate_level] - print( - f"Now aggregating by {aggregate_level}...with operation: {aggregate_operation}" + # Process the feature dtype dictionary for dask loading + cp_feature_df = all_feature_df.query("col_dtype == 'float'") + dtype_dict = dict(zip(cp_feature_df.feature_name, cp_feature_df.col_dtype)) + cp_features = cp_feature_df.feature_name.tolist() + + # Initialize dask loading + sc_df = dd.read_csv( + list(single_cell_site_files.values()), + blocksize=None, + assume_missing=True, + dtype=dtype_dict, ) - aggregate_df = aggregate( - population_df=single_cell_df, - strata=aggregate_columns, - features=aggregate_features, - operation=aggregate_operation, - ) - - output( - aggregate_df, - output_filename=aggregate_output_file, - compression=compression, - float_format=float_format, - ) + # Perform the aggregation based on the defined levels and columns + for aggregate_level, aggregate_columns in aggregate_levels.items(): + aggregate_output_file = aggregate_output_files[aggregate_level] + + print(f"Now aggregating by {aggregate_level}...with dask operation mean") + + # Dask currently only supports mean + agg_df = ( + sc_df.loc[:, aggregate_columns + cp_features] + .groupby(aggregate_columns) + .mean() + .compute() + .reset_index() + ) + + # Output to file + output( + agg_df, + output_filename=aggregate_output_file, + compression_options=compression, + float_format=float_format, + ) diff --git a/1.generate-profiles/2.normalize.py b/1.generate-profiles/2.normalize.py index 66e03ae..f1138be 100644 --- a/1.generate-profiles/2.normalize.py +++ b/1.generate-profiles/2.normalize.py @@ -64,6 +64,6 @@ samples=normalize_by_samples, method=normalize_method, output_file=output_file, - compression=compression, + compression_options=compression, float_format=float_format, ) diff --git a/1.generate-profiles/3.feature-select.py b/1.generate-profiles/3.feature-select.py index c52e0c3..cb20d1e 100644 --- a/1.generate-profiles/3.feature-select.py +++ b/1.generate-profiles/3.feature-select.py @@ -72,6 +72,6 @@ na_cutoff=feature_select_nacutoff, corr_threshold=feature_select_corr_threshold, output_file=output_file, - compression=compression, + compression_options=compression, float_format=float_format, ) diff --git a/1.generate-profiles/profiling_config.yaml b/1.generate-profiles/profiling_config.yaml deleted file mode 100644 index cd772c6..0000000 --- a/1.generate-profiles/profiling_config.yaml +++ /dev/null @@ -1,94 +0,0 @@ ---- -main_config: - step: generate-profiles - pipeline: Pooled Cell Painting - project_tag: 2018_11_20_Periscope_Calico ---- -core: - batch: 20190919_6W_CP074A - project_dir: /Users/gway/work/projects/ - site_dir: ../0.preprocess-sites/data/ - output_single_cell_dir: single_cell/ - output_profile_dir: profiles/ - output_one_single_cell_file_only: false - categorize_cell_quality: simple - compression: gzip - float_format: "%.5g" - compartments: - - Cells - - Nuclei - - Cytoplasm - parent_cols: - cells: - - Parent_Nuclei - cytoplasm: - - Parent_Nuclei - - Parent_Cells - spots: - - Parent_Cells - id_cols: - - ImageNumber - - ObjectNumber - control_barcodes: - - NT - ignore_files: - - .DS_Store ---- -single_cell: - perform: true - force_overwrite: false - prefilter_features: true - filter_cell_quality: - - Perfect - - Great - cell_quality_column: Metadata_Foci_Cell_Quality - output_basedir: data/single_cell - merge_columns: - image_column: ImageNumber - linking_compartment: Cytoplasm - linking_columns: - cells: Metadata_Cytoplasm_Parent_Cells - nuclei: Metadata_Cytoplasm_Parent_Nuclei - metadata_linking_columns: - - Metadata_Foci_site - - Metadata_Cells_ObjectNumber ---- -aggregate: - perform: true - output_basedir: data/profiles - operation: median - features: infer - levels: - gene: - - Metadata_Foci_Barcode_MatchedTo_GeneCode - guide: - - Metadata_Foci_Barcode_MatchedTo_GeneCode - - Metadata_Foci_Barcode_MatchedTo_Barcode ---- -normalize: - perform: true - output_basedir: data/profiles - method: standardize - levels: - - gene - - guide - - single_cell - by_samples: all - features: infer ---- -feature_select: - perform: true - output_basedir: data/profiles - operations: - - variance_threshold - - drop_na_columns - - blacklist - - drop_outliers - levels: - - gene - - guide - - single_cell - drop_samples: none - features: infer - na_cutoff: 0 - corr_threshold: 0.9 diff --git a/scripts/profile_utils.py b/scripts/profile_utils.py index 34997f0..6aae2bd 100644 --- a/scripts/profile_utils.py +++ b/scripts/profile_utils.py @@ -1,4 +1,6 @@ import pandas as pd +from pycytominer import aggregate +from pycytominer.cyto_utils import infer_cp_features def sanitize_gene_col(metadata_df, gene_col, control_barcodes): @@ -18,3 +20,63 @@ def sanitize_gene_col(metadata_df, gene_col, control_barcodes): metadata_df.loc[:, gene_col] = genes return metadata_df.copy() + + +def aggregate_pooled(site_df, strata, features, operation): + """ + Aggregate each pooled cell painting site independently + + The arguments are the same as provided to pycytominer.aggregate() + + Returns: + A dataframe of aggregated site-level profiles with perturbation count + """ + + site_counts_df = ( + site_df.loc[:, strata] + .value_counts() + .reset_index() + .rename(columns={0: "Metadata_pert_count"}) + ) + + site_df = aggregate(site_df, strata=strata, features=features, operation=operation) + + site_df = site_counts_df.merge(site_df, left_on=strata, right_on=strata) + + return site_df + + +def get_prop(df_group, pert_col="Metadata_pert_count"): + """ + Get the proportion of total single cells that specific site contributed. + Applied to a pandas.DataFrame.groupby() on the aggregated column level + """ + pert_count = df_group[pert_col] + prop_result = pd.DataFrame(pert_count / pert_count.sum()).rename( + columns={pert_col: "Metadata_pert_prop"} + ) + return prop_result.merge(df_group, left_index=True, right_index=True) + + +def approx_aggregate_piecewise(df, agg_cols, pert_col="Metadata_pert_count"): + """ + Given previously aggregated files concatenated with a column indicating count, + further aggregate. + """ + meta_features = infer_cp_features(df, metadata=True) + cp_features = df.drop(meta_features, axis="columns").columns.tolist() + + agg_df = ( + df.groupby(agg_cols) + .apply(lambda x: get_prop(x, pert_col=pert_col)) + .reset_index(drop=True) + ) + + agg_df = agg_df.loc[:, meta_features].merge( + agg_df.loc[:, cp_features].multiply(agg_df.Metadata_pert_prop, axis="rows"), + left_index=True, + right_index=True, + ) + + agg_df = agg_df.groupby(agg_cols).sum().reset_index() + return agg_df