From 311dcd7cc2c94da64d2a30f4c2838c75ff50b3f1 Mon Sep 17 00:00:00 2001 From: gwaygenomics Date: Sat, 29 May 2021 17:26:32 -0400 Subject: [PATCH 1/7] add functionality to combine aggregated site files --- 1.generate-profiles/1.aggregate.py | 88 +++++++++++++++++++----------- scripts/profile_utils.py | 62 +++++++++++++++++++++ 2 files changed, 117 insertions(+), 33 deletions(-) diff --git a/1.generate-profiles/1.aggregate.py b/1.generate-profiles/1.aggregate.py index dfdd238..fdc18fb 100644 --- a/1.generate-profiles/1.aggregate.py +++ b/1.generate-profiles/1.aggregate.py @@ -8,6 +8,9 @@ from pycytominer import aggregate from pycytominer.cyto_utils import output +sys.path.append("recipe") +from scripts.profile_utils import aggregate_pooled_site, approx_aggregate_piecewise + sys.path.append("config") from utils import parse_command_args, process_configuration @@ -24,7 +27,6 @@ experiment_config=experiment_config_file, ) - # Extract config arguments perform = config["options"]["profile"]["aggregate"]["perform"] @@ -60,44 +62,64 @@ ), "Error! The single cell file does not exist! Check 0.merge-single-cells.py" # Load single cell data +aggregate_output_dir.mkdir(parents=True, exist_ok=True) if aggregate_from_single_file: print(f"Loading one single cell file: {single_cell_file}") single_cell_df = pd.read_csv(single_cell_file, sep=",") + + # Perform the aggregation based on the defined levels and columns + for aggregate_level, aggregate_columns in aggregate_levels.items(): + aggregate_output_file = aggregate_output_files[aggregate_level] + + print( + f"Now aggregating by {aggregate_level}...with operation: {aggregate_operation}" + ) + + aggregate( + population_df=single_cell_df, + strata=aggregate_columns, + features=aggregate_features, + operation=aggregate_operation, + output_file=aggregate_output_file, + compression_options=compression, + float_format=float_format, + ) + else: sites = list(single_cell_site_files) print(f"Now loading data from {len(sites)} sites") - single_cell_df = [] - for site in sites: - site_file = single_cell_site_files[site] - if site_file.exists(): - site_df = pd.read_csv(site_file, sep=",") - single_cell_df.append(site_df) - else: - warnings.warn( - f"{site_file} does not exist. There must have been an error in processing" + + for aggregate_level, aggregate_columns in aggregate_levels.items(): + aggregate_output_file = aggregate_output_files[aggregate_level] + + site_aggregated_df = [] + for site in sites: + site_file = single_cell_site_files[site] + if site_file.exists(): + site_df = pd.read_csv(site_file, sep=",") + else: + warnings.warn( + f"{site_file} does not exist. There must have been an error in processing" + ) + + # Aggregate each site individually + site_df = aggregate_pooled( + site_df=site_df, + strata=aggregate_columns, + features=aggregate_features, + operation=aggregate_operation, ) - single_cell_df = pd.concat(single_cell_df, axis="rows").reset_index(drop=True) + site_aggregated_df.append(site_df) -# Perform the aggregation based on the defined levels and columns -aggregate_output_dir.mkdir(parents=True, exist_ok=True) -for aggregate_level, aggregate_columns in aggregate_levels.items(): - aggregate_output_file = aggregate_output_files[aggregate_level] - - print( - f"Now aggregating by {aggregate_level}...with operation: {aggregate_operation}" - ) - - aggregate_df = aggregate( - population_df=single_cell_df, - strata=aggregate_columns, - features=aggregate_features, - operation=aggregate_operation, - ) - - output( - aggregate_df, - output_filename=aggregate_output_file, - compression=compression, - float_format=float_format, - ) + site_agg_df = pd.concat(site_aggregated_df).reset_index(drop=True) + site_agg_df = approx_aggregate_piecewise( + df=site_agg_df, agg_cols=aggregate_columns + ) + + output( + site_agg_df, + output_filename=aggregate_output_file, + compression_options=compression, + float_format=float_format, + ) diff --git a/scripts/profile_utils.py b/scripts/profile_utils.py index 34997f0..6aae2bd 100644 --- a/scripts/profile_utils.py +++ b/scripts/profile_utils.py @@ -1,4 +1,6 @@ import pandas as pd +from pycytominer import aggregate +from pycytominer.cyto_utils import infer_cp_features def sanitize_gene_col(metadata_df, gene_col, control_barcodes): @@ -18,3 +20,63 @@ def sanitize_gene_col(metadata_df, gene_col, control_barcodes): metadata_df.loc[:, gene_col] = genes return metadata_df.copy() + + +def aggregate_pooled(site_df, strata, features, operation): + """ + Aggregate each pooled cell painting site independently + + The arguments are the same as provided to pycytominer.aggregate() + + Returns: + A dataframe of aggregated site-level profiles with perturbation count + """ + + site_counts_df = ( + site_df.loc[:, strata] + .value_counts() + .reset_index() + .rename(columns={0: "Metadata_pert_count"}) + ) + + site_df = aggregate(site_df, strata=strata, features=features, operation=operation) + + site_df = site_counts_df.merge(site_df, left_on=strata, right_on=strata) + + return site_df + + +def get_prop(df_group, pert_col="Metadata_pert_count"): + """ + Get the proportion of total single cells that specific site contributed. + Applied to a pandas.DataFrame.groupby() on the aggregated column level + """ + pert_count = df_group[pert_col] + prop_result = pd.DataFrame(pert_count / pert_count.sum()).rename( + columns={pert_col: "Metadata_pert_prop"} + ) + return prop_result.merge(df_group, left_index=True, right_index=True) + + +def approx_aggregate_piecewise(df, agg_cols, pert_col="Metadata_pert_count"): + """ + Given previously aggregated files concatenated with a column indicating count, + further aggregate. + """ + meta_features = infer_cp_features(df, metadata=True) + cp_features = df.drop(meta_features, axis="columns").columns.tolist() + + agg_df = ( + df.groupby(agg_cols) + .apply(lambda x: get_prop(x, pert_col=pert_col)) + .reset_index(drop=True) + ) + + agg_df = agg_df.loc[:, meta_features].merge( + agg_df.loc[:, cp_features].multiply(agg_df.Metadata_pert_prop, axis="rows"), + left_index=True, + right_index=True, + ) + + agg_df = agg_df.groupby(agg_cols).sum().reset_index() + return agg_df From 0a2882fca8fee8e77c0bbbcecb2a29ef44324327 Mon Sep 17 00:00:00 2001 From: gwaygenomics Date: Mon, 31 May 2021 13:49:30 +0000 Subject: [PATCH 2/7] fix import --- 1.generate-profiles/1.aggregate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/1.generate-profiles/1.aggregate.py b/1.generate-profiles/1.aggregate.py index fdc18fb..5dffdc2 100644 --- a/1.generate-profiles/1.aggregate.py +++ b/1.generate-profiles/1.aggregate.py @@ -9,7 +9,7 @@ from pycytominer.cyto_utils import output sys.path.append("recipe") -from scripts.profile_utils import aggregate_pooled_site, approx_aggregate_piecewise +from scripts.profile_utils import aggregate_pooled, approx_aggregate_piecewise sys.path.append("config") from utils import parse_command_args, process_configuration From 94c709eedeb81a63daad1729d418b883c66280f6 Mon Sep 17 00:00:00 2001 From: gwaygenomics Date: Mon, 31 May 2021 13:49:50 +0000 Subject: [PATCH 3/7] fix compression_options arg --- 1.generate-profiles/2.normalize.py | 2 +- 1.generate-profiles/3.feature-select.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/1.generate-profiles/2.normalize.py b/1.generate-profiles/2.normalize.py index 66e03ae..f1138be 100644 --- a/1.generate-profiles/2.normalize.py +++ b/1.generate-profiles/2.normalize.py @@ -64,6 +64,6 @@ samples=normalize_by_samples, method=normalize_method, output_file=output_file, - compression=compression, + compression_options=compression, float_format=float_format, ) diff --git a/1.generate-profiles/3.feature-select.py b/1.generate-profiles/3.feature-select.py index c52e0c3..cb20d1e 100644 --- a/1.generate-profiles/3.feature-select.py +++ b/1.generate-profiles/3.feature-select.py @@ -72,6 +72,6 @@ na_cutoff=feature_select_nacutoff, corr_threshold=feature_select_corr_threshold, output_file=output_file, - compression=compression, + compression_options=compression, float_format=float_format, ) From ff13e89a4c4bc2b2fc58c8d8b67d9dac0e181d39 Mon Sep 17 00:00:00 2001 From: gwaygenomics Date: Mon, 31 May 2021 17:59:30 -0400 Subject: [PATCH 4/7] remove extraneous and confusing config file --- 1.generate-profiles/profiling_config.yaml | 94 ----------------------- 1 file changed, 94 deletions(-) delete mode 100644 1.generate-profiles/profiling_config.yaml diff --git a/1.generate-profiles/profiling_config.yaml b/1.generate-profiles/profiling_config.yaml deleted file mode 100644 index cd772c6..0000000 --- a/1.generate-profiles/profiling_config.yaml +++ /dev/null @@ -1,94 +0,0 @@ ---- -main_config: - step: generate-profiles - pipeline: Pooled Cell Painting - project_tag: 2018_11_20_Periscope_Calico ---- -core: - batch: 20190919_6W_CP074A - project_dir: /Users/gway/work/projects/ - site_dir: ../0.preprocess-sites/data/ - output_single_cell_dir: single_cell/ - output_profile_dir: profiles/ - output_one_single_cell_file_only: false - categorize_cell_quality: simple - compression: gzip - float_format: "%.5g" - compartments: - - Cells - - Nuclei - - Cytoplasm - parent_cols: - cells: - - Parent_Nuclei - cytoplasm: - - Parent_Nuclei - - Parent_Cells - spots: - - Parent_Cells - id_cols: - - ImageNumber - - ObjectNumber - control_barcodes: - - NT - ignore_files: - - .DS_Store ---- -single_cell: - perform: true - force_overwrite: false - prefilter_features: true - filter_cell_quality: - - Perfect - - Great - cell_quality_column: Metadata_Foci_Cell_Quality - output_basedir: data/single_cell - merge_columns: - image_column: ImageNumber - linking_compartment: Cytoplasm - linking_columns: - cells: Metadata_Cytoplasm_Parent_Cells - nuclei: Metadata_Cytoplasm_Parent_Nuclei - metadata_linking_columns: - - Metadata_Foci_site - - Metadata_Cells_ObjectNumber ---- -aggregate: - perform: true - output_basedir: data/profiles - operation: median - features: infer - levels: - gene: - - Metadata_Foci_Barcode_MatchedTo_GeneCode - guide: - - Metadata_Foci_Barcode_MatchedTo_GeneCode - - Metadata_Foci_Barcode_MatchedTo_Barcode ---- -normalize: - perform: true - output_basedir: data/profiles - method: standardize - levels: - - gene - - guide - - single_cell - by_samples: all - features: infer ---- -feature_select: - perform: true - output_basedir: data/profiles - operations: - - variance_threshold - - drop_na_columns - - blacklist - - drop_outliers - levels: - - gene - - guide - - single_cell - drop_samples: none - features: infer - na_cutoff: 0 - corr_threshold: 0.9 From 3eb9ca23a506e44bed4310cbbc6bb9888ef891e3 Mon Sep 17 00:00:00 2001 From: gwaygenomics Date: Mon, 31 May 2021 18:01:43 -0400 Subject: [PATCH 5/7] reduce redundant columns also improve documentation --- 1.generate-profiles/0.merge-single-cells.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/1.generate-profiles/0.merge-single-cells.py b/1.generate-profiles/0.merge-single-cells.py index 36de3a7..fa5f410 100644 --- a/1.generate-profiles/0.merge-single-cells.py +++ b/1.generate-profiles/0.merge-single-cells.py @@ -150,9 +150,11 @@ ["Metadata_Foci_site"] + all_feature_df.feature_name.tolist(), axis="columns" ) - # Merge single cell profiles and metadata + # Merge single cell profiles and metadata info containing cell assignments + # A left merge ensures that we retain only cells that pass the quality threshold + common_cols = list(set(metadata_df.columns).intersection(set(sc_merged_df.columns))) sc_merged_df = metadata_df.merge( - sc_merged_df, on=merge_info["metadata_linking_columns"], how="left" + sc_merged_df, on=common_cols, how="left" ).reset_index(drop=True) if single_file_only: From b5dad04dff1ec78d2fdc90fe09ea942390591f6a Mon Sep 17 00:00:00 2001 From: gwaygenomics Date: Tue, 1 Jun 2021 12:17:56 -0400 Subject: [PATCH 6/7] use dask to hopefully prevent memory leak --- 1.generate-profiles/1.aggregate.py | 58 +++++++++++++++++------------- 1 file changed, 34 insertions(+), 24 deletions(-) diff --git a/1.generate-profiles/1.aggregate.py b/1.generate-profiles/1.aggregate.py index 5dffdc2..6e92eab 100644 --- a/1.generate-profiles/1.aggregate.py +++ b/1.generate-profiles/1.aggregate.py @@ -4,6 +4,7 @@ import argparse import warnings import pandas as pd +import dask.dataframe as dd from pycytominer import aggregate from pycytominer.cyto_utils import output @@ -43,10 +44,12 @@ single_cell_file = config["files"]["single_file_only_output_file"] single_cell_site_files = config["files"]["single_cell_site_files"] +prefilter_file = config["files"]["prefilter_file"] aggregate_output_files = config["files"]["aggregate_files"] sc_config = config["options"]["profile"]["single_cell"] aggregate_from_single_file = sc_config["output_one_single_cell_file_only"] +prefilter_features = sc_config["prefilter_features"] aggregate_args = config["options"]["profile"]["aggregate"] aggregate_operation = aggregate_args["operation"] @@ -61,6 +64,12 @@ single_cell_file.exists() ), "Error! The single cell file does not exist! Check 0.merge-single-cells.py" +# Load preselected features +all_feature_df = pd.read_csv(prefilter_file, sep="\t") + +if prefilter_features: + all_feature_df = all_feature_df.query("not prefilter_column") + # Load single cell data aggregate_output_dir.mkdir(parents=True, exist_ok=True) if aggregate_from_single_file: @@ -89,36 +98,37 @@ sites = list(single_cell_site_files) print(f"Now loading data from {len(sites)} sites") + # Process the feature dtype dictionary for dask loading + cp_feature_df = all_feature_df.query("col_dtype == 'float'") + dtype_dict = dict(zip(cp_feature_df.feature_name, cp_feature_df.col_dtype)) + cp_features = cp_feature_df.feature_name.tolist() + + # Initialize dask loading + sc_df = dd.read_csv( + list(single_cell_site_files.values()), + blocksize=None, + assume_missing=True, + dtype=dtype_dict, + ) + + # Perform the aggregation based on the defined levels and columns for aggregate_level, aggregate_columns in aggregate_levels.items(): aggregate_output_file = aggregate_output_files[aggregate_level] - site_aggregated_df = [] - for site in sites: - site_file = single_cell_site_files[site] - if site_file.exists(): - site_df = pd.read_csv(site_file, sep=",") - else: - warnings.warn( - f"{site_file} does not exist. There must have been an error in processing" - ) - - # Aggregate each site individually - site_df = aggregate_pooled( - site_df=site_df, - strata=aggregate_columns, - features=aggregate_features, - operation=aggregate_operation, - ) - - site_aggregated_df.append(site_df) - - site_agg_df = pd.concat(site_aggregated_df).reset_index(drop=True) - site_agg_df = approx_aggregate_piecewise( - df=site_agg_df, agg_cols=aggregate_columns + print(f"Now aggregating by {aggregate_level}...with dask operation mean") + + # Dask currently only supports mean + agg_df = ( + sc_df.loc[:, aggregate_columns + cp_features] + .groupby(aggregate_columns) + .mean() + .compute() + .reset_index() ) + # Output to file output( - site_agg_df, + agg_df, output_filename=aggregate_output_file, compression_options=compression, float_format=float_format, From 7de05a9c56794813f75f60e855c5b0527b7c6252 Mon Sep 17 00:00:00 2001 From: gwaygenomics Date: Tue, 1 Jun 2021 12:18:16 -0400 Subject: [PATCH 7/7] add dtype specification --- 0.preprocess-sites/scripts/site_processing_utils.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/0.preprocess-sites/scripts/site_processing_utils.py b/0.preprocess-sites/scripts/site_processing_utils.py index f512c89..f802679 100644 --- a/0.preprocess-sites/scripts/site_processing_utils.py +++ b/0.preprocess-sites/scripts/site_processing_utils.py @@ -67,6 +67,12 @@ def load_features(core, example_dir): feature_df = feature_df.query("feature_name not in @meta_df.feature_name") feature_df = pd.concat([meta_df, feature_df]).reset_index(drop=True) + # Add dtype column specification + col_dtype = [ + "str" if x.startswith("Metadata") else "float" for x in feature_df.feature_name + ] + feature_df = feature_df.assign(col_dtype=col_dtype) + return feature_df