-
Notifications
You must be signed in to change notification settings - Fork 0
/
data_utils.py
214 lines (169 loc) · 6.88 KB
/
data_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import os
import re
import spacy
import pdftotext
from tqdm import tqdm
from IPython import embed
class PdfExtractor:
def __init__(self):
self.nlp = spacy.load('en')
self.nlp_light = spacy.load('en', disable=['parser', 'tagger', 'ner'])
self.commercial_sentinel = "(COMMERCIAL BREAK)"
self.begin_clip = "(BEGIN VIDEO CLIP)"
self.end_clip = "(END VIDEO CLIP)"
self.load_names()
def extract_single_show(self, filepath):
with open(filepath, 'rb') as f:
full_pdf = ""
pdf = pdftotext.PDF(f)
for page in pdf:
full_pdf += " " + page
full_pdf = full_pdf.replace("T ", "T")
full_pdf = full_pdf.replace("BEGN", "BEGIN")
full_pdf = full_pdf.replace("(CROSSTALK)", "")
full_pdf = full_pdf.replace("(VIDEO CLIP PLAYS)", "")
full_pdf = full_pdf.replace("(BEEP)", "")
full_pdf = full_pdf.replace("(BEGIN VIDEO CLIP)", "(BEGIN VIDEO CLIP)\n")
full_pdf = full_pdf.replace("(BEGIN VIDOE CLIP)", "(BEGIN VIDEO CLIP)\n")
full_pdf = full_pdf.replace("(BEGIN VIDOEO CLIP)", "(BEGIN VIDEO CLIP)\n")
full_pdf = full_pdf.replace("(END VIDEO CLIP)", "(END VIDEO CLIP)\n")
full_pdf = full_pdf.replace("(COMMERCIAL BREAK)", "(COMMERCIAL BREAK)\n")
full_pdf = full_pdf.replace("(BEGIN VIDEOTAPE)", "(BEGIN VIDEO CLIP)\n")
full_pdf = full_pdf.replace("(BEGIN VIDEO CLIP,", "(BEGIN VIDEO CLIP)\n")
parts = full_pdf.split(self.commercial_sentinel)
cleaned_parts = list()
for part in parts:
cleaned_part = self.clean_part(part)
cleaned_parts.append(cleaned_part)
return cleaned_parts
def clean_part(self, part):
lines = part.split('\n')
start_idxs = list()
end_idxs = list()
prev = None
for idx, line in enumerate(lines):
if(self.begin_clip in line.strip()):
start_idx = idx
elif(self.end_clip in line.strip()):
end_idxs.append(idx)
start_idxs.append(start_idx)
dialogues = list()
in_video = False
curr_dialogue = ""
curr_speaker = None
for idx, line in enumerate(lines):
if(line.startswith('Content and Prog ramming Copyrig ht 2019')):
curr_speaker = None
continue
if(line.strip() == ""):
continue
if(idx in start_idxs):
if(curr_dialogue.strip() != ""):
dialogues.append((curr_speaker, curr_dialogue, str(in_video)))
curr_speaker = None
curr_dialogue = ""
in_video = True
elif(idx in end_idxs):
if(curr_dialogue.strip() != ""):
dialogues.append((curr_speaker, curr_dialogue, str(in_video)))
curr_speaker = None
curr_dialogue = ""
in_video = False
else:
speaker, dialogue = self.get_speaker_and_dialogue(line)
if(speaker is not None):
if(curr_dialogue.strip() != ""):
dialogues.append((curr_speaker, curr_dialogue, str(in_video)))
curr_speaker = speaker
curr_dialogue = dialogue
elif(speaker is None and curr_speaker is None):
continue
else:
curr_dialogue += " " + dialogue.replace('--', ' ').strip()
if(curr_dialogue.strip() != ""):
dialogues.append((curr_speaker, curr_dialogue, str(in_video)))
return dialogues
def get_speaker_and_dialogue(self, line):
names = re.findall("[A-Z\s\(\)\-\.,'\"\n]+:", line.strip())
if(len(names) > 0):
name = names[0]
dialogue = line.split(name)[1]
else:
name = None
dialogue = line.strip()
return name, dialogue
def extract_all_shows(self, host):
shows = list()
for file in os.listdir(os.path.join('../data/pdfs', host)):
current_show = self.extract_single_show(os.path.join('../data/pdfs', host, file))
shows.append((file, current_show))
return shows
def get_all_sentences(self, shows):
sentences = list()
for show in tqdm(shows):
for part in show[1]:
for dialogue in part:
for sentence in self.nlp(dialogue[1]).sents:
sentences.append(sentence.text)
return sentences
def get_all_dialogues(self, shows):
dialogues = list()
for show in shows:
for part in show[1]:
for dialogue in part:
dialogues.append(dialogue[1])
return dialogues
def load_names(self):
self.hannity_names = list()
self.maddow_names = list()
self.pbs_names = list()
with open('../data/names/hannity.txt') as f:
for line in f:
for token in self.nlp_light(line.strip()):
self.hannity_names.append(token.text)
with open('../data/names/maddow.txt') as f:
for line in f:
for token in self.nlp_light(line.strip()):
self.maddow_names.append(token.text)
with open('../data/names/pbs.txt') as f:
for line in f:
for token in self.nlp_light(line.strip()):
self.pbs_names.append(token.text)
class DebateParser:
def parse_file(self, filepath):
dialogues = list()
with open(filepath) as f:
for line in f:
if(line.strip() == ''):
continue
dialogue = dict()
splits = line.split(':')
name = splits[0]
if(self.hhmmss(splits)):
hour = int(splits[1].strip())
minute = int(splits[2].strip())
second = int(splits[3][:2])
text = ' '.join([splits[3][2:]] + [split for split in splits[4:]]).strip()
else:
hour = 0
minute = int(splits[1].strip())
second = int(splits[2][:2])
text = ' '.join([splits[2][2:]] + [split for split in splits[4:]]).strip()
dialogue['text'] = text
dialogue['speaker'] = name
dialogue['time'] = hour*3600 + minute*60 + second
dialogues.append(dialogue)
return dialogues
def isint(self, text):
try:
int(text)
return True
except:
return False
def hhmmss(self, splits):
if(len(splits) < 4):
return False
elif(len(splits[2].strip()) == 2 and self.isint(splits[3][:2])):
return True
else:
return False