-
Notifications
You must be signed in to change notification settings - Fork 51
/
Copy pathcht_data.py
229 lines (173 loc) · 7.89 KB
/
cht_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
import sys
import gzip
import os
import numpy as np
import util
class TestSNP:
def __init__(self, name, geno_hap1, geno_hap2, AS_target_ref, AS_target_alt,
hetps, totals, counts):
self.name = name
self.geno_hap1 = geno_hap1
self.geno_hap2 = geno_hap2
self.AS_target_ref = AS_target_ref
self.AS_target_alt = AS_target_alt
self.hetps = hetps
self.totals = totals
self.counts = counts
def is_het(self):
"""returns True if the test SNP is heterozygous"""
return self.geno_hap1 != self.geno_hap2
def is_homo_ref(self):
"""Returns True if test SNP is homozygous for reference allele"""
return self.geno_hap1 == 0 and self.geno_hap2 == 0
def is_homo_alt(self):
"""Returns True if test SNP is homozygous for non-reference allele"""
return self.geno_hap1 == 1 and self.geno_hap2 == 1
dup_snp_warn = True
def parse_test_snp(snpinfo, shuffle=False):
global dup_snp_warn
snp_id = snpinfo[2]
tot = 0 if snpinfo[16] == "NA" else float(snpinfo[16])
if snpinfo[6] == "NA":
geno_hap1 = 0
geno_hap2 = 0
else:
geno_hap1 = int(snpinfo[6].strip().split("|")[0])
geno_hap2 = int(snpinfo[6].strip().split("|")[1])
count = 0 if snpinfo[15] == "NA" else int(snpinfo[15])
if snpinfo[9].strip() == "NA" or geno_hap1 == geno_hap2:
# SNP is homozygous, so there is no AS info
return TestSNP(snp_id, geno_hap1, geno_hap2, [], [], [], tot, count)
else:
# positions of target SNPs
snp_locs = np.array([int(y.strip()) for y in snpinfo[9].split(';')])
# counts of reads that match reference overlapping linked 'target' SNPs
snp_as_ref = np.array([int(y) for y in snpinfo[12].split(';')])
# counts of reads that match alternate allele
snp_as_alt = np.array([int(y) for y in snpinfo[13].split(';')])
# heterozygote probabilities
snp_hetps = np.array([np.float64(y.strip())
for y in snpinfo[10].split(';')])
# linkage probabilities, not currently used
snp_linkageps = np.array([np.float64(y.strip())
for y in snpinfo[11].split(';')])
# same SNP should not be provided multiple times, this
# can create problems with combined test. Warn and filter
# duplicate SNPs
uniq_loc, uniq_idx = np.unique(snp_locs, return_index=True)
if dup_snp_warn and uniq_loc.shape[0] != snp_locs.shape[0]:
sys.stderr.write("WARNING: discarding SNPs that are repeated "
"multiple times in same line\n")
# only warn once
dup_snp_warn = False
snp_as_ref = snp_as_ref[uniq_idx]
snp_as_alt = snp_as_alt[uniq_idx]
snp_hetps = snp_hetps[uniq_idx]
# linkage probabilities currently not used
snp_linkageps = snp_linkageps[uniq_idx]
if shuffle:
# permute allele-specific read counts by flipping them randomly at
# each SNP
for y in range(len(snp_as_ref)):
if random.randint(0, 1) == 1:
temp = snp_as_ref[y]
snp_as_ref[y] = snp_as_alt[y]
snp_as_alt[y] = temp
return TestSNP(snp_id, geno_hap1, geno_hap2, snp_as_ref,
snp_as_alt, snp_hetps, tot, count)
def open_input_files(in_filename):
if not os.path.exists(in_filename) or not os.path.isfile(in_filename):
raise IOError("input file %s does not exist or is not a "
"regular file\n" % in_filename)
# read file that contains list of input files
in_file = open(in_filename, "rt")
infiles = []
for line in in_file:
# open each input file and read first line
filename = line.rstrip()
sys.stderr.write(" " + filename + "\n")
if (not filename) or (not os.path.exists(filename)) or \
(not os.path.isfile(filename)):
sys.stderr.write("input file '%s' does not exist or is not a "
"regular file\n" % in_file)
exit(2)
if util.is_gzipped(filename):
f = gzip.open(filename, "rt")
else:
f = open(filename, "rt")
# skip header
f.readline()
infiles.append(f)
in_file.close()
if len(infiles) == 0:
sys.stderr.write("no input files specified in file '%s'\n" % in_filename)
exit(2)
return infiles
def read_count_matrices(input_filename, shuffle=False, skip=0,
min_counts=0, min_as_counts=0, sample=0):
"""Given an input file that contains paths to input files for all individuals, and returns
matrix of observed read counts, and matrix of expected read counts
"""
infiles = open_input_files(input_filename)
is_finished = False
count_matrix = []
expected_matrix = []
line_num = 0
skip_num = 0
while not is_finished:
is_comment = False
line_num += 1
count_line = []
expected_line = []
num_as = 0
for i in range(len(infiles)):
# read next row from this input file
line = infiles[i].readline().strip()
if line.startswith("#") or line.startswith("CHROM"):
# skip comment lines and header line
is_comment = True
elif line:
if is_finished:
raise IOError("All input files should have same number of lines. "
"LINE %d is present in file %s, but not in all input files\n"
% (line_num, infiles[i].name))
if is_comment:
raise IOError("Comment and header lines should be consistent accross "
"all input files. LINE %d is comment or header line in some input files "
"but not in file %s" % (line_num, infiles[i].name))
# parse test SNP and associated info from input file row
new_snp = parse_test_snp(line.split(), shuffle=shuffle)
if new_snp.is_het():
num_as += np.sum(new_snp.AS_target_ref) + \
np.sum(new_snp.AS_target_alt)
count_line.append(new_snp.counts)
expected_line.append(new_snp.totals)
else:
# out of lines from at least one file, assume we are finished
is_finished = True
if not is_finished and not is_comment:
if skip_num < skip:
# skip this row
skip_num += 1
else:
if(sum(count_line) >= min_counts and num_as >= min_as_counts):
# this line exceeded minimum number of read counts and AS counts
count_matrix.append(count_line)
expected_matrix.append(expected_line)
skip_num = 0
count_matrix = np.array(count_matrix, dtype=int)
expected_matrix = np.array(expected_matrix, dtype=np.float64)
sys.stderr.write("count_matrix dimension: %s\n" % str(count_matrix.shape))
sys.stderr.write("expect_matrix dimension: %s\n" % str(expected_matrix.shape))
nrow = count_matrix.shape[0]
if (sample > 0) and (sample < count_matrix.shape[0]):
# randomly sample subset of rows without replacement
sys.stderr.write("randomly sampling %d target regions\n" % sample)
samp_index = np.arange(nrow)
np.random.shuffle(samp_index)
samp_index = samp_index[:sample]
count_matrix = count_matrix[samp_index,]
expected_matrix = expected_matrix[samp_index,]
sys.stderr.write("new count_matrix dimension: %s\n" % str(count_matrix.shape))
sys.stderr.write("new expect_matrix dimension: %s\n" % str(expected_matrix.shape))
return count_matrix, expected_matrix