-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutilities.py
294 lines (263 loc) · 11.1 KB
/
utilities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# Conjunto de funciones e utilidades para el procesamiento de datos
# Autor: Josue Peña Atencio
# Ultima modificación: 26 de Julio 2021
from pandas import concat, DataFrame
from copy import deepcopy
# Retornar las primeras y ultimas 'cant' filas de un dataframe 'df'
def filter_rows(df, cant):
data_frames = [df.iloc[0:cant], df.iloc[len(df)-cant:len(df)]]
return concat(data_frames)
# Aplanar el contenido de diccionarios que contienen
# información pre-procesada de pacientes en conjuntos de fácil acceso X, y
# Dp: datos de pacientes que no curaron
# Dn: datos de pacientes que curaron
def flatten(Dp, Dn):
X = list()
y = list()
D = {**Dp, **Dn}
for ID in D.keys():
for cita in D[ID]:
X.append(D[ID][cita])
if ID in Dp:
y.append(1)
elif ID in Dn:
y.append(0)
else:
raise Exception("Paciente con resultado de tratamiento inconcluso")
return X, y
# filename: nombre de archivo en el cual guardar el automata
# M: automata (diccionario)
# start_states: conjunto de estados iniciales
# end_states: conjunto de estados finales o de aceptación
def write_automata(filename, M, start_states, end_states):
alphabet = set()
for state in M:
for symb in M[state]:
alphabet.add(symb)
with open(filename,'w') as tf:
tf.write("# Alfabeto\n")
tf.write("{" + ",".join(map(str,sorted(alphabet))) + "}\n")
tf.write("\n")
tf.write("# Numero de estados\n")
tf.write(f"{len(M)}\n")
tf.write("# Estados iniciales\n")
tf.write(f"{len(start_states)} ")
tf.write(" ".join(map(str,sorted(start_states))) + "\n")
tf.write("# Estados finales\n")
tf.write(f"{len(end_states)} " + " ".join(map(str,sorted(end_states))) + "\n")
tf.write("# Descripcion de las transiciones\n")
n_trans = 0 # Cantidad de transiciones
for s in M: n_trans += len(M[s])
tf.write(f"{n_trans}\n")
for state_1 in sorted(M.keys()):
for symbol in sorted(M[state_1].keys()):
for state_2 in sorted(M[state_1][symbol]):
tf.write(f"{state_1} {symbol} {state_2}\n")
# filename: nombre de archivo del automata a cargar
def load_automata(filename):
with open(filename,'r') as tf:
results = list(tf)
#alphabet = set(map(int,results[1].strip()[1:-1].split(',')[1:]))
start_states = set(map(int,results[6].strip().split()[1:]))
end_states = set(map(int,results[8].strip().split()[1:]))
M = {}
# Caso borde: incluso si no hay ninguna transicion, el diccionario del automata queda bien formado
for s in start_states:
M[s] = dict()
for s in end_states:
M[s] = dict()
# Añadir transiciones
for i in range(11,len(results)):
v,s,w = map(int,results[i].strip().split())
if v not in M:
M[v] = {s:set([w])}
elif s not in M[v]:
M[v][s] = set([w])
else:
M[v][s].add(w)
if w not in M:
M[w] = dict()
return M, start_states, end_states
# filename: nombre de archivo al cual escribir la muestra
# alphabet_size: tamaño del alfabeto de símbolos de la muestra
# X: palabras de pacientes
# y: etiqueta de cada palabra (1 ó 0)
def write_sample(filename, alphabet_size, X, y):
with open(filename,'w') as tf:
# Cantidad de muestras y cantidad de símbolos
tf.write(f"{len(X)} {alphabet_size}\n")
for i in range(len(X)):
tf.write(f"{y[i]} {len(X[i])} ")
for s in X[i]:
tf.write(f"{s} ")
tf.write("\n")
# filename: nombre del archivo de muestra a cargar
def load_sample(filename):
alphabet = set()
X = list()
y = list()
with open(filename,'r') as tf:
results = list(tf)
for w in results[1:]:
word = tuple(map(int,w[3:].split()))
X.append(word)
for c in word:
alphabet.add(c)
if w[0] == '1':
y.append(1)
elif w[0] == '0':
y.append(0)
else:
raise Exception("Etiqueta de palabra incorrecta")
return alphabet, X, y
# M: automata (diccionario) estado -> simbolo -> estado(s)
# start_states: estados iniciales
# end_states: estados finales (de aceptación)
# X: lista de palabras a probar en en el automata
def predict(M, start_states, end_states, X):
pred = list()
for word in X:
states_curr = start_states.copy()
states_next = set()
incomplete = False
for symbol in word:
for state in sorted(states_curr):
if symbol in M[state]:
states_next.update(M[state][symbol])
# No se pudo hacer ninguna trancisión a partir del símbolo actual
# y no se ha terminado de consumir la cadena
if (len(states_next) == 0):
incomplete = True
break
states_curr = states_next.copy()
states_next = set()
if not incomplete:
if len(states_curr & end_states) > 0:
# Aceptar la cadena
pred.append(1)
else:
# Rechazar la cadena
pred.append(0)
else:
pred.append(0)
return pred
# Preparar datos para técnicas
# Crear alfabeto y palabras de cada paciente para cada cita
# x_patients: datos de expresión génica de pacientes de célula 'x' (neutrofilos, eosinofilos o monocitos)
# x_genes: datos de genes célula 'x' de pacientes
# outcomes: resultados de tratamiento para cada paciente
# cant: cantidad de genes más sobre expresados e infra expresados a seleccionar, respectivamente
# (total de genes seleccionados = cant*2)
# cod: tipo de codificación de genes:
# 'binary' (genes sobre e infra expresados) o 'ternary' (sobre, infra y genes estables)
# Retorna dos diccionarios: Dp (muestras positivas), Dn (muestras negativas)
# Estructura: ID_de_paciente --> {C1 : palabra, C2 : palabra, C3 : palabra} (Cada CX es una cita)
# No todos los pacientes tienen las 3 citas
def pre_process(x_patients,x_genes,outcomes,cant,cod):
conflicts = dict()
fraction = (1/4)
# Obtener los génes más sobre e infra expresados.
x_genes_aux = filter_rows(x_genes, cant) # x_genes entra ordenado descendientemente según 'deseq_logfc'
x_patients_aux = x_patients.loc[x_patients.index.intersection(x_genes_aux.index)]
x_describe = x_patients_aux.apply(DataFrame.describe, axis=1)
Dp = dict() # Palabras positivas
Dn = dict() # Palabras negativas
# Construir palabras para cada paciente
for p_ID in x_patients_aux:
word = tuple()
# sector = n : cantidad total de genes usados
# Números del 1..n son genes infra exp; n+1..2n son genes sobre exp; 2n+1..3n son genes estables
# Si la codificación es binaria el tamaño del alfabeto es 2n
# Si es ternaria el tamaño del alfabeto es 3n
sector = cant*2
i = 1
gen_info = list()
for gen in x_patients_aux.index:
mRNA_amount = x_patients_aux[p_ID].loc[gen]
mean = x_describe.loc[gen]['mean']
std = x_describe.loc[gen]['std']
dist_inf = abs(mRNA_amount-(mean - std)) # Distancia de valor del gen a clasificación infra-expresado
dist_sob = abs(mRNA_amount-(mean + std)) # Distancia a sobre-expresado
dist_est_1 = abs(mRNA_amount-(mean - fraction*std)) # Distancia a borde inferior estable
dist_est_2 = abs(mRNA_amount-(mean + fraction*std)) # Distancia a borde superior estable
if cod=='binary':
gen_info.append([i, dist_inf, dist_sob])
else:
gen_info.append([i, dist_inf, dist_sob, dist_est_1, dist_est_2])
# Genes infra exp
if mRNA_amount <= (mean - std):
word += (i,)
# Genes sobre exp
elif mRNA_amount >= (mean + std):
word += (sector+i,)
gen_info[-1][0] += sector
# Genes estables
elif cod=='ternary' and ((mean - fraction*std) <= mRNA_amount <= (mean + fraction*std)):
word += ((2*sector)+i,)
gen_info[-1][0] += (2*sector)
i += 1
# Valor de salida del paciente p_ID (0.0 cura, 1.0 falla)
outcome = outcomes.loc['SU'+p_ID[1:-2]]['EF_LC_ESTADO_FINAL_ESTUDIO']
# Si la palabra queda vacía
if len(word) == 0:
# Gen más cercano a ser clasificado
total_min_1 = min(gen_info, key=lambda x : min(x[1:]))
gen_info.remove(total_min_1)
# Segundo mejor gen
# Con 2 genes es suficiente para diferenciar las palabras creadas mediante este método
# sin agregar muchos símbolos
total_min_2 = min(gen_info, key=lambda x : min(x[1:]))
gen_info.remove(total_min_2)
# Redondear distancias
total_min_1 = list(map(lambda x: round(x,3), total_min_1))
total_min_2 = list(map(lambda x: round(x,3), total_min_2))
index_1 = total_min_1.index(min(total_min_1))
index_2 = total_min_2.index(min(total_min_2))
symbol_1, symbol_2 = None, None
# Generar símbolos
if index_1 == 1: # Infra-expresado
symbol_1 = total_min_1[0]
elif index_1 == 2: # Sobre-expresado
symbol_1 = 2*total_min_1[0]
elif index_1 in [3,4]: # Estable
symbol_1 = 3*total_min_1[0]
if index_2 == 1:
symbol_2 = total_min_2[0]
elif index_2 == 2:
symbol_2 = 2*total_min_2[0]
elif index_2 in [3,4]:
symbol_2 = 3*total_min_2[0]
word = (symbol_1,symbol_2)
if word not in conflicts:
conflicts[word] = (0, outcome) # (cantidad de conflictos, última etiqueta detectada)
else:
if outcome != conflicts[word][1]:
conflicts[word] = (conflicts[word][0]+1,outcome)
# Clasificar palabra como positiva o negativa
normal_ID = p_ID[:-2]
cita = 'C'+p_ID[-1:]
if outcome == 1.0:
if normal_ID not in Dp:
Dp[normal_ID] = dict()
Dp[normal_ID][cita] = word
elif outcome == 0.0:
if normal_ID not in Dn:
Dn[normal_ID] = dict()
Dn[normal_ID][cita] = word
else:
raise Exception(f"El paciente tiene resultado de tratamiento inválido: {outcome}")
alphabet_size = 2*(cant*2)
if cod=='ternary':
alphabet_size = 3*(cant*2)
# Eliminar palabras inconsistentes
D = {**deepcopy(Dp), **deepcopy(Dn)}
for k in conflicts:
if conflicts[k][0] > 0:
for p_ID in D:
for cita in D[p_ID]:
if D[p_ID][cita] == k:
if p_ID in Dp:
del Dp[p_ID][cita]
else:
del Dn[p_ID][cita]
return Dp, Dn, alphabet_size