-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathzohar_preprocess_file.py
178 lines (121 loc) · 5.21 KB
/
zohar_preprocess_file.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import docx
import io
import os
import re
import argparse
from regexes import REGEXES
class Options:
"file processing options for one file"
def __init__(self, path, mode, lang, n=None):
self.path = path
self.mode = mode
self.lang = lang
self.n = n
class Document:
"Docx content and meta info"
def __init__(self, paragraphs, article_id, asset):
self.paragraphs = paragraphs
self._id = article_id
self._title = asset.title
self.lang = asset.lang
self.size = len(asset.content)
def title(self):
formatted = self._title.split('|')[0].split('-')[0].strip().replace(' ', '-')
return f'{formatted}-{self._id}'
def parse_docx(content):
"Converts docx bytes to a list of strings where each string represents a paragraph"
with io.BytesIO(content) as fileobj:
try:
doc = docx.Document(fileobj)
except:
return list()
paragraphs = [p.text for p in doc.paragraphs]
return paragraphs
def keep_paragraph(p):
return p[:1].isdigit() or len(p.split()) > 8
def paragraphs(content):
"Converts docx file to plain text where paragraphs are separated by a newline character"
pars = parse_docx(content)
return '\n'.join(p.replace('\n', ' ') for p in pars if keep_paragraph(p))
def regex_keep(output, regexes):
"gets a string and a list of regexes and returns a set of indexes covering all of matches"
keep = set()
for regex in regexes:
keep_regex = re.compile(regex, flags=re.MULTILINE)
for match in keep_regex.finditer(output):
keep.update(tuple(range(match.start(), match.end())))
return keep
def regex_split(output, regexes):
"gets a string and a list of regexes and returns a set of the last indexes of every match"
split = set()
for regex in regexes:
split_regex = re.compile(regex, flags=re.MULTILINE)
for match in split_regex.finditer(output):
split.add(match.end() - 1)
return split
def split_by_indexes(output, keep, split):
"""Splits a string by regexes, `keep` are a set of regexes of patterns where a split is impossible,
`split` are regexes of patterns that precede every split"""
indexes = [-1] + list(sorted(set(split) - set(keep))) + [len(output)]
for i, j in zip(indexes, indexes[1:]):
yield output[i + 1:j + 1].strip()
def split_sentences(output, lang):
"""Splits the string by sentences. The regexes defining the sentences are located in regex_{lang}.py files"""
keep = regex_keep(output, REGEXES[lang].SENTENCES_KEEP) # lang == 'he')
split = regex_split(output, REGEXES[lang].SENTENCES_SPLIT)
return '\n'.join(split_by_indexes(output, keep, split))
def split_characters(output, lang, n_chars):
"""Splits the string by characters. Avoids splitting in the middle of a word or
in the middle of patterns defined by language specific ITEM regex"""
item = re.compile(REGEXES[lang].ITEM)
space = re.compile(r'(\s+)')
chunk = ''
for line in output.split('\n'):
if item.match(line):
if chunk:
yield chunk.strip()
chunk = ''
for word in space.split(line):
chunk += word
if len(chunk) > n_chars:
yield chunk.strip()
chunk = ''
if chunk:
yield chunk.strip()
def replace(output, lang):
"Runs replacement regexes on a string"
pairs = REGEXES[lang].REPLACE
for pattern, repl in pairs:
output = re.sub(pattern, repl, output, flags=re.MULTILINE)
return output
def process(options, postfix):
"the main logic: splits the input into chunks, runs replacement regexes and saves the output"
for opt in options:
with open(opt.path, 'rb') as fin:
content = fin.read()
output = paragraphs(content)
if opt.mode == 'sentences':
output = split_sentences(output, opt.lang)
elif opt.mode == 'chars':
output = '\n'.join(split_characters(output, opt.lang, opt.n))
elif opt.mode == 'joined':
output = ' '.join(output.split())
output = replace(output, opt.lang)
with open(opt.path + postfix, 'w', encoding='utf-8') as fout:
fout.write(output)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--source", default='he', help="source language")
parser.add_argument("--target", default='en', help="target language")
parser.add_argument("target_file")
parser.add_argument("source_file")
parser.add_argument("--chunk", choices=['paragraphs', 'sentences', 'chars', 'joined'], default='paragraphs')
parser.add_argument("--n_chars_tgt", help='number of chars in an target phrase', default=255)
parser.add_argument("--n_chars_src", help='number of chars in a source phrase', default=225)
parser.add_argument("--postfix", help='postifx (attached to filename at the end)', default='.txt')
args = parser.parse_args()
opts = [Options(args.target_file, args.chunk, args.target, args.n_chars_tgt),
Options(args.source_file, args.chunk, args.source, args.n_chars_src)]
process(opts, args.postfix)
if __name__ == "__main__":
main()