-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhelper.py
185 lines (142 loc) · 5.19 KB
/
helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import numpy as np
import pandas as pd
import re
import spacy
import os
import time
import csv
import datetime
from collections import Counter
def get_tok():
try:
return spacy.load('de_core_news_md')
except OSError:
print('Downloading language model for the spaCy POS tagger\n'
"(don't worry, this will only happen once)")
from spacy.cli import download
download('de_core_news_md')
return spacy.load('de_core_news_md')
# tokenize the phrases, including lowercasing and Umlaute removal
def tokenize(tok, text):
return [token.lemma_ for token in tok.tokenizer(filter_text(text))]
def filter_text(text):
text = str(text)
text = text.lower()
text = umlaute(text)
text = special_chars(text)
text = filter_space(text)
return text
def umlaute(input):
return input.replace("ä", "ae").replace("Ä", "Äe").replace("ö", "oe").replace("Ö", "oe").replace("ü", "ue").replace(
"Ü", "ue")
def special_chars(input):
return input.replace("(", "").replace(")", "").replace("!", "").replace("[", "").replace("]", "").replace("/",
"").replace(
",", "").replace(".", "")
def filter_space(input):
return re.sub(' +', ' ', input)
def labelClasses(df):
idxToCodes = {}
i = 0
for codes in df['code']:
if codes not in idxToCodes.keys():
idxToCodes[codes] = i
i = i + 1
return idxToCodes
def loadDataset(file):
df = pd.read_csv(file, delimiter=';', index_col=None)
df.columns = ['code', 'phrase']
return df
def loadDataset_idx(file, word_idx, threshold):
df = pd.read_csv(file, delimiter=';', index_col=None)
df.columns = ['code', 'phrase']
code_counter = Counter(df['code'])
df['code_occurrence'] = df['code'].apply(lambda x: code_counter[x])
# Remove Concepts with a low occurrence
discard = df[df['code_occurrence'] < threshold]
df = df[df['code_occurrence'] >= threshold]
# Adding Phrase Length
df['phrases_length'] = df['phrase'].apply(lambda x: len(str(x).split()))
df = df[df['phrases_length'] > 0]
# Classes indices
idx = labelClasses(df)
df['class'] = df['code'].apply(lambda x: idx[x])
tok = get_tok()
df['encoded'] = df['phrase'].apply(lambda x: np.array(encode_sentence(tok, x, word_idx)))
return df, discard['code'].to_list()
def loadAugmentation(file, word_idx, discards):
df = pd.read_csv(file, delimiter=';', index_col=None)
df.columns = ['code', 'phrase']
# remove the discarded concepts from the augmentation
df = df[~df['code'].isin(discards)]
code_counter = Counter(df['code'])
df['code_occurrence'] = df['code'].apply(lambda x: code_counter[x])
# Adding Phrase Length
df['phrases_length'] = df['phrase'].apply(lambda x: len(str(x).split()))
df = df[df['phrases_length'] > 0]
# Classes indices
idx = labelClasses(df)
df['class'] = df['code'].apply(lambda x: idx[x])
tok = get_tok()
df['encoded'] = df['phrase'].apply(lambda x: np.array(encode_sentence(tok, x, word_idx)))
return df
def encode_sentence(tok, text, vocab2index, N=10):
tokenized = tokenize(tok, text)
# Padding
encoded = np.zeros(N, dtype=int)
try:
enc1 = np.array([vocab2index.get(word, vocab2index["UNK"]) for word in tokenized])
except:
print('')
length = min(N, len(enc1))
encoded[:length] = enc1[:length]
return encoded
def encodeDataset(df):
tok = get_tok()
counts = Counter()
for index, row in df.iterrows():
counts.update(tokenize(tok, row['phrase']))
for word in list(counts):
if counts[word] < 2:
del counts[word]
vocab2index = {"": 0, "UNK": 1}
words = ["", "UNK"]
for word in counts:
vocab2index[word] = len(words)
words.append(word)
df['encoded'] = df['phrase'].apply(lambda x: np.array(encode_sentence(tok, x, vocab2index)))
return df, words, vocab2index
def encodeDataset_with_index(idx, df):
tok = get_tok()
df['encoded'] = df['phrase'].apply(lambda x: np.array(encode_sentence(tok, x, idx)))
return df
def construct_idx(df):
tok = get_tok()
counts = Counter()
for index, row in df.iterrows():
counts.update(tokenize(tok, row['phrase']))
for word in list(counts):
if counts[word] < 2:
del counts[word]
vocab2index = {"": 0, "UNK": 1}
words = ["", "UNK"]
for word in counts:
vocab2index[word] = len(words)
words.append(word)
with open('data/idx.csv', 'w') as csv_file:
writer = csv.writer(csv_file, delimiter=';')
for key, value in vocab2index.items():
writer.writerow([key, value])
def load_idx(file):
with open(file, mode='r') as infile:
reader = csv.reader(infile, delimiter=';')
idx = dict((rows[0], rows[1]) for rows in reader)
return idx
def mk_result_dir(dir_path):
try:
os.mkdir(dir_path)
return dir_path
except:
new_path = dir_path + "_" + datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d-%H-%M-%S')
new_path = mk_result_dir(new_path)
return new_path