-
Notifications
You must be signed in to change notification settings - Fork 4
/
1.aggregate.py
167 lines (138 loc) · 6.05 KB
/
1.aggregate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import os
import sys
import pathlib
import argparse
import warnings
import logging
import traceback
import pandas as pd
from pycytominer import aggregate
from pycytominer.cyto_utils import output
sys.path.append("config")
from utils import parse_command_args, process_configuration, get_split_aware_site_info
recipe_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(os.path.join(recipe_path, "scripts"))
from io_utils import read_csvs_with_chunksize
# Configure logging
logfolder = os.path.join(os.path.dirname(recipe_path), "logs")
if not os.path.isdir(logfolder):
os.mkdir(logfolder)
logging.basicConfig(
filename=os.path.join(logfolder, "1.aggregate.log"), level=logging.INFO,
)
def handle_excepthook(exc_type, exc_value, exc_traceback):
logging.error("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))
traceback_details = "\n".join(traceback.extract_tb(exc_traceback).format())
print(f"Uncaught Exception: {traceback_details}")
sys.excepthook = handle_excepthook
# Configure experiment
args = parse_command_args()
logging.info(f"Args used:{args}")
batch_id = args.batch_id
options_config_file = args.options_config_file
experiment_config_file = args.experiment_config_file
split_step = args.split_step
config, incomplete_sites, errored_sites = process_configuration(
batch_id,
step="profile--aggregate",
options_config=options_config_file,
experiment_config=experiment_config_file,
)
logging.info(f"Config used:{config}")
logging.info(f"Skipped incomplete sites during config processing: {incomplete_sites}")
logging.info(f"Skipped errored sites during config processing: {errored_sites}")
# Extract config arguments
split_info = config["experiment"]["split"][split_step]
perform = config["options"]["profile"]["aggregate"]["perform"]
# check if this step should be performed
if not perform:
sys.exit("Config file set to perform=False, not performing {}".format(__file__))
ignore_files = config["options"]["core"]["ignore_files"]
float_format = config["options"]["core"]["float_format"]
compression = config["options"]["core"]["compression"]
input_spotdir = config["directories"]["preprocess"]["spots"]
single_cell_output_dir = config["directories"]["profile"]["single_cell"]
aggregate_output_dir = config["directories"]["profile"]["profiles"]
single_cell_file = config["files"]["single_file_only_output_file"]
single_cell_site_files = config["files"]["single_cell_site_files"]
aggregate_output_files = config["files"]["aggregate_files"]
sc_config = config["options"]["profile"]["single_cell"]
aggregate_from_single_file = sc_config["output_one_single_cell_file_only"]
aggregate_args = config["options"]["profile"]["aggregate"]
aggregate_operation = aggregate_args["operation"]
aggregate_features = aggregate_args["features"]
aggregate_levels = aggregate_args["levels"]
force = aggregate_args["force_overwrite"]
print("Starting 1.aggregate.")
logging.info(f"Started 1.aggregate.")
sites = [x.name for x in input_spotdir.iterdir() if x.name not in ignore_files]
site_info_dict = get_split_aware_site_info(
config["experiment"], sites, split_info, separator="___"
)
for data_split_site in site_info_dict:
# Define a dataset specific file
single_cell_dataset_file = pathlib.Path(
single_cell_output_dir,
single_cell_file.name.replace(".csv.gz", f"_{data_split_site}.csv.gz"),
)
# Input argument flow control
if aggregate_from_single_file:
assert (
single_cell_dataset_file.exists()
), "Error! The single cell file does not exist! Check 0.merge-single-cells.py"
# Load single cell data
if aggregate_from_single_file:
print(f"Loading one single cell file: {single_cell_dataset_file}")
single_cell_df = read_csvs_with_chunksize(single_cell_dataset_file, sep=",")
logging.info(f"Loaded one single cell file: {single_cell_dataset_file}")
else:
sites = site_info_dict[data_split_site]
print(f"Now loading data from {len(sites)} sites")
logging.info(f"Loading data from {len(sites)} sites")
single_cell_df = []
for site in sites:
site_file = single_cell_site_files[site]
if site_file.exists():
site_df = read_csvs_with_chunksize(site_file, sep=",")
single_cell_df.append(site_df)
print(f"Appended {site}")
logging.info(f"Appended {site}")
else:
warnings.warn(
f"{site_file} does not exist. There must have been an error in processing"
)
logging.warning(f"{site_file} does not exist.")
print(f"Making single cell dataframe.")
logging.info(f"Making single cell dataframe.")
single_cell_df = pd.concat(single_cell_df, axis="rows").reset_index(drop=True)
print(f"Made single cell dataframe.")
logging.info(f"Made single cell dataframe.")
# Perform the aggregation based on the defined levels and columns
aggregate_output_dir.mkdir(parents=True, exist_ok=True)
for aggregate_level, aggregate_columns in aggregate_levels.items():
aggregate_output_file = aggregate_output_files[aggregate_level]
print(
f"Now aggregating by {aggregate_level}...with operation: {aggregate_operation}"
)
logging.info(
f"Aggregating by {aggregate_level}...with operation: {aggregate_operation}"
)
aggregate_df = aggregate(
population_df=single_cell_df,
strata=aggregate_columns,
features=aggregate_features,
operation=aggregate_operation,
)
# Define a dataset specific file
aggregate_dataset_file = pathlib.Path(
aggregate_output_dir,
aggregate_output_file.name.replace(".csv.gz", f"_{data_split_site}.csv.gz"),
)
output(
aggregate_df,
output_filename=aggregate_dataset_file,
compression_options=compression,
float_format=float_format,
)
print("Finished 1.aggregate.")
logging.info(f"Finished 1.aggregate.")