-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathnertokenizer.py
executable file
·283 lines (248 loc) · 12.4 KB
/
nertokenizer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
"""
Greynir: Natural language processing for Icelandic
High-level tokenizer and named entity recognizer
Copyright (C) 2023 Miðeind ehf.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see http://www.gnu.org/licenses/.
This module exports recognize_entities(), a function which
adds a named entity recognition layer on top of the reynir.bintokenizer
functionality.
Named entity recognition requires access to the SQL scraper database
and is thus not appropriate for inclusion in reynir.bintokenizer,
as GreynirEngine does not (and should not) require a database to be present.
"""
from typing import DefaultDict, List, Iterator, Dict, Union, Tuple, Optional, Type
from collections import defaultdict
import logging
from tokenizer import TOK, Tok
from tokenizer.abbrev import Abbreviations
from reynir.bindb import GreynirBin
from db import SessionContext, OperationalError, Session
from db.models import Entity
def recognize_entities(
token_stream: Iterator[Tok],
enclosing_session: Optional[Session] = None,
token_ctor: Type[TOK] = TOK,
) -> Iterator[Tok]:
"""Parse a stream of tokens looking for (capitalized) entity names
The algorithm implements N-token lookahead where N is the
length of the longest entity name having a particular initial word.
Adds a named entity recognition layer on top of the
reynir.bintokenizer.tokenize() function."""
# Token queue
tq: List[Tok] = []
# Phrases we're considering. Note that an entry of None
# indicates that the accumulated phrase so far is a complete
# and valid known entity name.
state: Dict[Union[str, None], List[Tuple[List[str], Entity]]] = defaultdict(list)
# Entitiy definition cache
ecache: Dict[str, List[Entity]] = dict()
# Last name to full name mapping ('Clinton' -> 'Hillary Clinton')
lastnames: Dict[str, Tok] = dict()
with GreynirBin.get_db() as db, SessionContext(
session=enclosing_session, commit=True, read_only=True
) as session:
def fetch_entities(w: str, fuzzy: bool = True) -> List[Entity]:
"""Return a list of entities matching the word(s) given,
exactly if fuzzy = False, otherwise also as a starting word(s)"""
try:
q = session.query(Entity.name, Entity.verb, Entity.definition)
if fuzzy:
q = q.filter(Entity.name.like(w + " %") | (Entity.name == w))
else:
q = q.filter(Entity.name == w)
return q.all()
except OperationalError as e:
logging.warning(f"SQL error in fetch_entities(): {e}")
return []
def query_entities(w: str) -> List[Entity]:
"""Return a list of entities matching the initial word given"""
e = ecache.get(w)
if e is None:
ecache[w] = e = fetch_entities(w)
return e
def lookup_lastname(lastname: str) -> Optional[Tok]:
"""Look up a last name in the lastnames registry,
eventually without a possessive 's' at the end, if present"""
fullname = lastnames.get(lastname)
if fullname is not None:
# Found it
return fullname
# Try without a possessive 's', if present
if lastname.endswith("s"):
return lastnames.get(lastname[0:-1])
# Nope, no match
return None
def flush_match():
"""Flush a match that has been accumulated in the token queue"""
if len(tq) == 1 and lookup_lastname(tq[0].txt) is not None:
# If single token, it may be the last name of a
# previously seen entity or person
return token_or_entity(tq[0])
# Reconstruct original text behind phrase
ename = " ".join([t.txt for t in tq])
# We don't include the definitions in the token - they should be looked up
# on the fly when processing or displaying the parsed article
return token_ctor.Entity(ename)
def token_or_entity(token: Tok) -> Tok:
"""Return a token as-is or, if it is a last name of a person
that has already been mentioned in the token stream by full name,
refer to the full name"""
assert token.txt[0].isupper()
tfull = lookup_lastname(token.txt)
if tfull is None:
# Not a last name of a previously seen full name
return token
if tfull.kind != TOK.PERSON:
# Return an entity token with no definitions
# (this will eventually need to be looked up by full name when
# displaying or processing the article)
return token_ctor.Entity(token.txt)
# Return the full name meanings
return token_ctor.Person(token.txt, tfull.person_names)
try:
while True:
token = next(token_stream)
if not token.txt: # token.kind != TOK.WORD:
if state:
if None in state:
yield flush_match()
else:
yield from tq
tq = []
state = defaultdict(list)
yield token
continue
# Look for matches in the current state and build a new state
newstate: DefaultDict[
Union[str, None], List[Tuple[List[str], Entity]]
] = defaultdict(list)
w = token.txt # Original word
def add_to_state(slist: List[str], entity: Entity) -> None:
"""Add the list of subsequent words to the new parser state"""
wrd = slist[0] if slist else None
rest = slist[1:]
newstate[wrd].append((rest, entity))
if w in state:
# This matches an expected token
tq.append(token) # Add to lookahead token queue
# Add the matching tails to the new state
for sl, entity in state[w]:
add_to_state(sl, entity)
# Update the lastnames mapping
fullname = " ".join([t.txt for t in tq])
parts = fullname.split()
# If we now have 'Hillary Rodham Clinton',
# make sure we delete the previous 'Rodham' entry
for p in parts[1:-1]:
if p in lastnames:
del lastnames[p]
if parts[-1][0].isupper():
# 'Clinton' -> 'Hillary Rodham Clinton'
lastnames[parts[-1]] = token_ctor.Entity(fullname)
else:
# Not a match for an expected token
if state:
if None in state:
# We have an accumulated match, but if the next token
# is an uppercase word without a BÍN meaning, we
# append it to the current entity regardless.
# This means that 'Charley Lucknow' is handled as a single
# new entity name even if 'Charley' already exists
# as an entity.
while w and w[0].isupper() and not token.val:
# Append to the accumulated token queue, which will
# be squashed to a single token in flush_match()
tq.append(token)
token = next(token_stream)
w = token.txt
# Flush the already accumulated match
yield flush_match()
else:
yield from tq
tq = []
# Add all possible new states for entity names
# that could be starting
weak = True
cnt = 1
upper = w and w[0].isupper()
parts: List[str] = []
if upper and " " in w:
# For all uppercase phrases (words, entities, persons),
# maintain a map of last names to full names
parts = w.split()
lastname = parts[-1]
# Clinton -> Hillary [Rodham] Clinton
if lastname[0].isupper():
# Look for Icelandic patronyms/matronyms
_, m = db.lookup_g(lastname, False)
if m and any(mm.fl in {"föð", "móð"} for mm in m):
# We don't store Icelandic patronyms/matronyms
# as surnames
pass
else:
lastnames[lastname] = token
elist: List[Entity] = []
if token.kind == TOK.WORD and upper and w not in Abbreviations.DICT:
if " " in w:
# w may be a person name with more than one embedded word
# parts is assigned in the if statement above
cnt = len(parts)
elif not token.has_meanings or ("-" in token.meanings[0].stofn):
# No BÍN meaning for this token, or the meanings
# were constructed by concatenation (indicated by a hyphen
# in the stem)
weak = False # Accept single-word entity references
# elist is a list of Entity instances
elist = query_entities(w)
if elist:
# This word might be a candidate to start an entity reference
candidate = False
for e in elist:
# List of subsequent words in entity name
sl = e.name.split()[cnt:]
if sl:
# Here's a candidate for a longer entity reference
# than we already have
candidate = True
if sl or not weak:
add_to_state(sl, e)
if weak and not candidate:
# Found no potential entity reference longer than this token
# already is - and we have a BÍN meaning for it:
# Abandon the effort
assert not newstate
assert not tq
yield token_or_entity(token)
else:
# Go for it: Initialize the token queue
tq = [token]
else:
# Not a start of an entity reference: simply yield the token
assert not tq
if upper:
# Might be a last name referring to a full name
yield token_or_entity(token)
else:
yield token
# Transition to the new state
state = newstate
except StopIteration:
# Token stream is exhausted
pass
# Yield an accumulated match if present
if state:
if None in state:
yield flush_match()
else:
yield from tq
tq = []
assert not tq