diff --git a/deeppavlov/models/spelling_correction/levenshtein/levenshtein_searcher.py b/deeppavlov/models/spelling_correction/levenshtein/levenshtein_searcher.py index 3c05d6848c..73837740c7 100644 --- a/deeppavlov/models/spelling_correction/levenshtein/levenshtein_searcher.py +++ b/deeppavlov/models/spelling_correction/levenshtein/levenshtein_searcher.py @@ -23,34 +23,45 @@ class LevenshteinSearcher: """ - Класс для поиска близких слов - в соответствии с расстоянием Левенштейна + Class for searching for close words + according to the Levenshtein distance """ - def __init__(self, alphabet, dictionary, operation_costs=None, - allow_spaces=False, euristics='none'): + def __init__( + self, + alphabet, + dictionary, + operation_costs=None, + allow_spaces=False, + heuristics="none", + ): self.alphabet = alphabet self.allow_spaces = allow_spaces - if isinstance(euristics, int): - if euristics < 0: - raise ValueError("Euristics should be non-negative integer or None") + if isinstance(heuristics, int): + if heuristics < 0: + raise ValueError("Heuristics should be non-negative integer or None") else: - self.euristics = euristics if euristics != 0 else None - elif euristics in ["none", "None", None]: - self.euristics = None + self.heuristics = heuristics if heuristics != 0 else None + elif heuristics in ["none", "None", None]: + self.heuristics = None else: - raise ValueError("Euristics should be non-negative integer or None") + raise ValueError("Heuristics should be non-negative integer or None") if isinstance(dictionary, Trie): - # словарь передан уже в виде бора + # the dictionary is already a Trie self.dictionary = dictionary else: - self.dictionary = make_trie(alphabet, dictionary, make_cashed=True, - precompute_symbols=self.euristics, - allow_spaces=self.allow_spaces) + self.dictionary = make_trie( + alphabet, + dictionary, + make_cashed=True, + precompute_symbols=self.heuristics, + allow_spaces=self.allow_spaces, + ) self.transducer = SegmentTransducer( - alphabet, operation_costs=operation_costs, allow_spaces=allow_spaces) - self._precompute_euristics() + alphabet, operation_costs=operation_costs, allow_spaces=allow_spaces + ) + self._precompute_heuristics() self._define_h_function() def __contains__(self, word): @@ -60,25 +71,28 @@ def search(self, word, d, allow_spaces=True, return_cost=True): """ Finds all dictionary words in d-window from word """ - if not all((c in self.alphabet - or (c == " " and self.allow_spaces)) for c in word): + if not all( + (c in self.alphabet or (c == " " and self.allow_spaces)) for c in word + ): return [] # raise ValueError("{0} contains an incorrect symbol".format(word)) return self._trie_search( - word, d, allow_spaces=allow_spaces, return_cost=return_cost) + word, d, allow_spaces=allow_spaces, return_cost=return_cost + ) - def _trie_search(self, word, d, transducer=None, - allow_spaces=True, return_cost=True): + def _trie_search( + self, word, d, transducer=None, allow_spaces=True, return_cost=True + ): """ - Находит все слова в префиксном боре, расстояние до которых - в соответствии с заданным преобразователем не превышает d + Finds all words in the Trie, the distance to which in + accordance with the specified converter does not exceed d """ if transducer is None: - # разобраться с пробелами + # deals with spaces transducer = self.transducer.inverse() allow_spaces &= self.allow_spaces trie = self.dictionary - # инициализация переменных + # variable initialization used_agenda_keys = set() agenda = SortedListWithKey(key=(lambda x: x[1])) h = self.h_func(word, trie.root) @@ -87,7 +101,7 @@ def _trie_search(self, word, d, transducer=None, agenda.add((key, value)) answer = dict() k = 0 - # очередь с приоритетом с промежуточными результатами + # priority queue with intermediate results while len(agenda) > 0: key, value = agenda.pop(0) if key in used_agenda_keys: @@ -95,18 +109,18 @@ def _trie_search(self, word, d, transducer=None, used_agenda_keys.add(key) low, pos, index = key cost, g, h = value - # g --- текущая стоимость, h --- нижняя оценка будущей стоимости - # cost = g + h --- нижняя оценка суммарной стоимости + # g --- current value, h --- lower estimate of future value + # cost = g + h --- lower estimate of the total cost k += 1 max_upperside_length = min(len(word) - pos, transducer.max_up_length) for upperside_length in range(max_upperside_length + 1): new_pos = pos + upperside_length - curr_up = word[pos: new_pos] + curr_up = word[pos:new_pos] if curr_up not in transducer.operation_costs: continue for curr_low, curr_cost in transducer.operation_costs[curr_up].items(): new_g = g + curr_cost - if new_g > d: # если g > d, то h можно не вычислять + if new_g > d: # if g > d, then h doesn't neet to be calculated continue if curr_low == " ": if allow_spaces and trie.is_final(index): @@ -135,82 +149,86 @@ def _trie_search(self, word, d, transducer=None, else: return [elem[0] for elem in answer] - def _precompute_euristics(self): + def _precompute_heuristics(self): """ - Предвычисляет будущие символы и стоимости операций с ними - для h-эвристики + Precalculates future symbols and the cost of operations with them + for h-heuristics """ - if self.euristics is None: + if self.heuristics is None: return - # вычисление минимальной стоимости операции, - # приводящей к появлению ('+') или исчезновению ('-') данного символа + # calculation of the minimum cost of the operation, + # leading to the appearance ('+') or disappearance ('-') of this symbol removal_costs = {a: np.inf for a in self.alphabet} insertion_costs = {a: np.inf for a in self.alphabet} if self.allow_spaces: - removal_costs[' '] = np.inf - insertion_costs[' '] = np.inf + removal_costs[" "] = np.inf + insertion_costs[" "] = np.inf for up, costs in self.transducer.operation_costs.items(): for low, cost in costs.items(): if up == low: continue - if up != '': + if up != "": removal_cost = cost / len(up) for a in up: removal_costs[a] = min(removal_costs[a], removal_cost) - if low != '': + if low != "": insertion_cost = cost / len(low) for a in low: insertion_costs[a] = min(insertion_costs[a], insertion_cost) - # предвычисление возможных будущих символов в узлах дерева - # precompute_future_symbols(self.dictionary, self.euristics, self.allow_spaces) - # предвычисление стоимостей потери символа в узлах дерева - self._absense_costs_by_node = _precompute_absense_costs( - self.dictionary, removal_costs, insertion_costs, - self.euristics, self.allow_spaces) - # массив для сохранения эвристик - self._temporary_euristics = [dict() for i in range(len(self.dictionary))] + # precomputation of possible future symbols in tree nodes + # precompute_future_symbols(self.dictionary, self.heuristics, self.allow_spaces) + # precomputing the cost of symbol loss in tree nodes + self._absence_costs_by_node = _precompute_absence_costs( + self.dictionary, + removal_costs, + insertion_costs, + self.heuristics, + self.allow_spaces, + ) + # array for storing heuristics + self._temporary_heuristics = [dict() for i in range(len(self.dictionary))] def _define_h_function(self): - if self.euristics in [None, 0]: - self.h_func = (lambda *x: 0.0) + if self.heuristics in [None, 0]: + self.h_func = lambda *x: 0.0 else: - self.h_func = self._euristic_h_function + self.h_func = self._heuristic_h_function - def _euristic_h_function(self, suffix, index): + def _heuristic_h_function(self, suffix, index): """ - Вычисление h-эвристики из работы Hulden,2009 для текущей вершины словаря + Calculation of the h-heuristic from Hulden, 2009 for the current vertex of the dictionary - Аргументы: + Arguments: ---------- suffix : string - непрочитанный суффикс входного слова + the unread suffix of the input word index : int - индекс текущего узла в словаре + index of the current node in the dictionary - Возвращает: + Returns: ----------- - cost : float - оценка снизу для стоимости замены, - приводящей к входному слову с суффиксом suffix, - если прочитанный префикс слова без опечатки - привёл в вершину с номером index + const : float + bottom estimate for replacement cost, + leading to an input word with the suffix ``suffix``, + if the read prefix of the correctly spelled word + led to the vertex with the ``index`` number """ - if self.euristics > 0: - suffix = suffix[:self.euristics] - # кэширование результатов - index_temporary_euristics = self._temporary_euristics[index] - cost = index_temporary_euristics.get(suffix, None) + if self.heuristics > 0: + suffix = suffix[: self.heuristics] + # caching of results + index_temporary_heuristics = self._temporary_heuristics[index] + cost = index_temporary_heuristics.get(suffix, None) if cost is not None: return cost - # извлечение нужных данных из массивов - absense_costs = self._absense_costs_by_node[index] + # extracting the necessary data from arrays + absence_costs = self._absence_costs_by_node[index] data = self.dictionary.data[index] - costs = np.zeros(dtype=np.float64, shape=(self.euristics,)) - # costs[j] --- оценка штрафа при предпросмотре вперёд на j символов + costs = np.zeros(dtype=np.float64, shape=(self.heuristics,)) + # costs[j] --- penalty estimate when previewing forward by j characters for i, a in enumerate(suffix): - costs[i:] += absense_costs[a][i:] + costs[i:] += absence_costs[a][i:] cost = max(costs) - index_temporary_euristics[suffix] = cost + index_temporary_heuristics[suffix] = cost return cost def _minimal_replacement_cost(self, first, second): @@ -226,52 +244,57 @@ def _minimal_replacement_cost(self, first, second): return min(removal_cost, insertion_cost) -def _precompute_absense_costs(dictionary, removal_costs, insertion_costs, n, - allow_spaces=False): +def _precompute_absence_costs( + dictionary, removal_costs, insertion_costs, n, allow_spaces=False +): """ - Вычисляет минимальную стоимость появления нового символа в узлах словаря - в соответствии со штрафами из costs + Calculates the minimum cost of the appearance of a new character in the dictionary nodes + according to the penalties from costs - Аргументы: + Arguments: --------------- - dictionary : Trie - словарь, хранящийся в виде ациклического автомата + dictionary : Tkey + a dictionary stored as an acyclic automaton removal_costs : dict - штрафы за удаление символов + penalties for deleting characters insertion_costs : dict - штрафы за вставку символов + penalties for inserting characters n : int - глубина ``заглядывания вперёд'' в словаре + the depth of "looking ahead" in the dictionary - Возвращает + Returns --------------- answer : list of dicts, len(answer)=len(dictionary) - answer[i][a][j] равно минимальному штрафу за появление символа a - в j-ой позиции в вершине с номером i + answer[i][a][j] is equal to the minimum penalty for the appearance of the symbol a + in the j-th position at the vertex with the number i """ answer = [dict() for node in dictionary.data] if n == 0: return answer curr_alphabet = copy.copy(dictionary.alphabet) if allow_spaces: - curr_alphabet += [' '] + curr_alphabet += [" "] for l, (costs_in_node, node) in enumerate(zip(answer, dictionary.data)): - # определение минимальной стоимости удаления символов + # determining the minimum cost of deleting characters curr_node_removal_costs = np.empty(dtype=np.float64, shape=(n,)) if len(node[0]) > 0: - curr_node_removal_costs[0] = min(removal_costs[symbol] for symbol in node[0]) + curr_node_removal_costs[0] = min( + removal_costs[symbol] for symbol in node[0] + ) for j, symbols in enumerate(node[1:], 1): if len(symbols) == 0: curr_node_removal_costs[j:] = curr_node_removal_costs[j - 1] break curr_cost = min(removal_costs[symbol] for symbol in symbols) - curr_node_removal_costs[j] = min(curr_node_removal_costs[j - 1], curr_cost) + curr_node_removal_costs[j] = min( + curr_node_removal_costs[j - 1], curr_cost + ) else: curr_node_removal_costs[:] = np.inf - # определение минимальной стоимости вставки + # determining the minimum insertion cost for a in curr_alphabet: curr_symbol_costs = np.empty(dtype=np.float64, shape=(n,)) curr_symbol_costs.fill(insertion_costs[a]) @@ -279,28 +302,30 @@ def _precompute_absense_costs(dictionary, removal_costs, insertion_costs, n, if a in symbols: curr_symbol_costs[j:] = 0.0 break - curr_symbol_costs[j] = min(curr_symbol_costs[j], curr_node_removal_costs[j]) + curr_symbol_costs[j] = min( + curr_symbol_costs[j], curr_node_removal_costs[j] + ) costs_in_node[a] = curr_symbol_costs return answer class SegmentTransducer: """ - Класс, реализующий взвешенный конечный преобразователь, - осуществляющий замены из заданного списка операций + Class implementing a weighted finite converter, + performing substitutions from a given list of operations - Аргументы: + Arguments: ---------- alphabet : list - алфавит + alphabet operation_costs : dict or None(optional, default=None) - словарь вида {(up,low) : cost} + dictionary in the form {(up,low): cost} allow_spaces : bool(optional, default=False) - разрешены ли элементы трансдукции, содержащие пробел - (используется только если явно не заданы operation costs - и они равны значению по умолчанию) + allow transduction elements which contain spaces + (used only if operation costs are not explicitly specified, + and they are equal to the default value) """ @@ -316,26 +341,26 @@ def __init__(self, alphabet, operation_costs=None, allow_spaces=False): self._make_maximal_key_lengths() # self.maximal_value_lengths = {} # for up, probs in self.operation_costs.items(): - # СЛИШКОМ МНОГО ВЫЗОВОВ, НАДО КАК-ТО ЗАПОМНИТЬ - # МАКСИМАЛЬНЫЕ ДЛИНЫ КЛЮЧЕЙ ПРИ ОБРАЩЕНИИ + # THERE ARE TOO MANY CALLS, YOU NEED TO REMEMBER SOMEHOW + # MAXIMUM KEY LENGTHS WHEN ACCESSING # max_low_length = max(len(low) for low in probs) if (len(probs) > 0) else -1 # self.maximal_value_lengths[up] = self.maximal_key_length def get_operation_cost(self, up, low): """ - Возвращает стоимость элементарной трансдукции up->low - или np.inf, если такой элементарной трансдукции нет + Returns the cost of elementary transduction up->low + or np.inf, if there is no such elementary transduction - Аргументы: + Arguments: ---------- up, low : string - элементы элементарной трансдукции + elements of elementary transduction - Возвращает: + Returns: ----------- - cost : float - стоимость элементарной трансдукции up->low - (np.inf, если такая трансдукция отсутствует) + const : float + the cost of elementary transduction up->low + (np.inf, if there is no such transduction) """ up_costs = self.operation_costs.get(up, None) if up_costs is None: @@ -345,9 +370,9 @@ def get_operation_cost(self, up, low): def inverse(self): """ - Строит пробразователь, задающий обратное конечное преобразование + Build a converter specifying the inverse finite transformation """ - # УПРОСТИТЬ ОБРАЩЕНИЕ!!! + # SIMPLIFY HANDLING!!! inversed_transducer = SegmentTransducer(self.alphabet, operation_costs=dict()) inversed_transducer.operation_costs = self._reversed_operation_costs inversed_transducer._reversed_operation_costs = self.operation_costs @@ -359,44 +384,46 @@ def inverse(self): def distance(self, first, second, return_transduction=False): """ - Вычисляет трансдукцию минимальной стоимости, - отображающую first в second + Calculates the minimum cost transduction, + displaying first in second - Аргументы: + Arguments: ----------- first : string second : string - Верхний и нижний элементы трансдукции + Upper and lower transduction elements return_transduction : bool (optional, default=False) - следует ли возвращать трансдукцию минимального веса - (см. возвращаемое значение) + should the minimum weight transduction be returned + (see return value) - Возвращает: + Returns: ----------- - (final_cost, transductions) : tuple(float, list) - если return_transduction=True, то возвращает - минимальную стоимость трансдукции, переводящей first в second - и список трансдукций с данной стоимостью - - final_cost : float - если return_transduction=False, то возвращает - минимальную стоимость трансдукции, переводящей first в second + (final_cost, conversions) : tuple(float, list) + if return_transduction=True, then returns + the minimum cost of the transduction that converts first to second + and a list of transductions with a given cost + + final_const : float + if return_transduction=False, it returns + the minimum cost of the transduction that converts first to second """ if return_transduction: - add_pred = (lambda x, y: (y == np.inf or x < y)) + add_pred = lambda x, y: (y == np.inf or x < y) else: - add_pred = (lambda x, y: (y == np.inf or x <= y)) - clear_pred = (lambda x, y: x < y < np.inf) + add_pred = lambda x, y: (y == np.inf or x <= y) + clear_pred = lambda x, y: x < y < np.inf update_func = lambda x, y: min(x, y) - costs, backtraces = self._fill_levenshtein_table(first, second, - update_func, add_pred, clear_pred) + costs, backtraces = self._fill_levenshtein_table( + first, second, update_func, add_pred, clear_pred + ) final_cost = costs[-1][-1] if final_cost == np.inf: transductions = [None] elif return_transduction: - transductions = self._backtraces_to_transductions(first, second, backtraces, - final_cost, return_cost=False) + transductions = self._backtraces_to_transductions( + first, second, backtraces, final_cost, return_cost=False + ) if return_transduction: return final_cost, transductions else: @@ -404,40 +431,41 @@ def distance(self, first, second, return_transduction=False): def transduce(self, first, second, threshold): """ - Возвращает все трансдукции, переводящие first в second, - чья стоимость не превышает threshold + Returns all transductions that translate first to second, + whose value does not exceed threshold - Возвращает: + Returns: ---------- result : list - список вида [(трансдукция, стоимость)] + a list of the form [(transduction, cost)] """ - add_pred = (lambda x, y: x <= threshold) - clear_pred = (lambda x, y: False) - update_func = (lambda x, y: min(x, y)) - costs, backtraces = self._fill_levenshtein_table(first, second, - update_func, add_pred, clear_pred, - threshold=threshold) - result = self._backtraces_to_transductions(first, second, - backtraces, threshold, return_cost=True) + add_pred = lambda x, y: x <= threshold + clear_pred = lambda x, y: False + update_func = lambda x, y: min(x, y) + costs, backtraces = self._fill_levenshtein_table( + first, second, update_func, add_pred, clear_pred, threshold=threshold + ) + result = self._backtraces_to_transductions( + first, second, backtraces, threshold, return_cost=True + ) return result def lower_transductions(self, word, max_cost, return_cost=True): """ - Возвращает все трансдукции с верхним элементом word, - чья стоимость не превышает max_cost + Returns all transductions with the top ``word`` element, + whose cost does not exceed ``max_cost`` - ` Возвращает: + Returns: ---------- result : list - список вида [(трансдукция, стоимость)], если return_cost=True - список трансдукций, если return_cost=False - список отсортирован в порядке возрастания стоимости трансдукции + a list of the form [(transduction, cost)] if return_cost=True + the list of transductions, if return_cost=False + the list is sorted by the cost of transduction in ascending order """ prefixes = [[] for i in range(len(word) + 1)] prefixes[0].append(((), 0.0)) for pos in range(len(prefixes)): - # вставки + # inserts prefixes[pos] = self._perform_insertions(prefixes[pos], max_cost) max_upperside_length = min(len(word) - pos, self.max_up_length) for upperside_length in range(1, max_upperside_length + 1): @@ -447,7 +475,9 @@ def lower_transductions(self, word, max_cost, return_cost=True): new_cost = cost + low_cost if new_cost <= max_cost: new_transduction = transduction + (up, low) - prefixes[pos + upperside_length].append((new_transduction, new_cost)) + prefixes[pos + upperside_length].append( + (new_transduction, new_cost) + ) answer = sorted(prefixes[-1], key=(lambda x: x[0])) if return_cost: return answer @@ -476,55 +506,56 @@ def upper_transductions(self, word, max_cost, return_cost=True): inversed_transducer = self.inverse() return inversed_transducer.lower_transductions(word, max_cost, return_cost) - def _fill_levenshtein_table(self, first, second, update_func, add_pred, clear_pred, - threshold=None): + def _fill_levenshtein_table( + self, first, second, update_func, add_pred, clear_pred, threshold=None + ): """ - Функция, динамически заполняющая таблицу costs стоимости трансдукций, - costs[i][j] --- минимальная стоимость трансдукции, - переводящей first[:i] в second[:j] + A function that dynamically fills in the costs table of the cost of transductions, + costs[i][j] --- minimum cost of transduction, + translating first[:i] to second[:j] - Аргументы: + Arguments: ---------- first, second : string - Верхний и нижний элементы трансдукции + Upper and lower transduction elements update_func : callable, float*float -> bool - update_func(x, y) возвращает новое значение в ячейке таблицы costs, - если старое значение --- y, а потенциально новое значение --- x - везде update_func = min + update_func(x, y) returns a new value in a cell of the costs table, + if the old value is --- y, and the potential new value is --- x + everywhere update_func = min add_pred : callable : float*float -> bool - add_pred(x, y) возвращает, производится ли добавление - нового элемента p стоимости x в ячейку backtraces[i][j] - в зависимости от значения costs[i][j]=y и текущей стоимости x + add_pred(x, y) returns whether a new element p of value x is added + into the cell ``backtraces[i][j]`` + depending on the value of costs[i][j]=y and the current cost of x clear_pred : callable : float*float -> bool - clear_pred(x, y) возвращает, производится ли очистка - ячейки backtraces[i][j] в зависимости от значения costs[i][j]=y - и текущей стоимости x элемента p, добавляемого в эту ячейку + clear_pred(x, y) returns whether the cleanup is being performed + in the cell ``backtraces[i][j]`` depending on the value of costs[i][j]=y + and the current cost x of the element p being added to this cell - Возвращает: + Returns: ----------- costs : array, dtype=float, shape=(len(first)+1, len(second)+1) - массив, в ячейке с индексами i, j которого хранится - минимальная стоимость трансдукции, переводящей first[:i] в second[:j] + an array in a cell with indexes i, j of which is stored + the minimum cost of the transduction that translates first[:i] to second[:j] backtraces : array, dtype=list, shape=(len(first)+1, len(second)+1) - массив, в ячейке с индексами i, j которого хранятся - обратные ссылки на предыдущую ячейку в оптимальной трансдукции, - приводящей в ячейку backtraces[i][j] + an array in a cell with indexes i, j of which are stored + backlinks to the previous cell in optimal transduction, + leading to the cell backtraces[i][j] """ m, n = len(first), len(second) - # если threshold=None, то в качестве порога берётся удвоенная стоимость - # трансдукции, отображающей символы на одинаковых позициях друг в друга + # if threshold=None, then threshold is double the cost of + # the transduction that maps symbols at the same positions into each other if threshold is None: threshold = 0.0 for a, b in zip(first, second): threshold += self.get_operation_cost(a, b) if m > n: for a in first[n:]: - threshold += self.get_operation_cost(a, '') + threshold += self.get_operation_cost(a, "") elif m < n: for b in second[m:]: - threshold += self.get_operation_cost('', b) + threshold += self.get_operation_cost("", b) threshold *= 2 - # инициализация возвращаемых массивов + # initialization of returned arrays costs = np.zeros(shape=(m + 1, n + 1), dtype=np.float64) costs[:] = np.inf backtraces = [None] * (m + 1) @@ -533,7 +564,7 @@ def _fill_levenshtein_table(self, first, second, update_func, add_pred, clear_pr costs[0][0] = 0.0 for i in range(m + 1): for i_right in range(i, min(i + self.max_up_length, m) + 1): - up = first[i: i_right] + up = first[i:i_right] max_low_length = self.max_low_lengths_by_up.get(up, -1) if max_low_length == -1: # no up key in transduction continue @@ -542,10 +573,11 @@ def _fill_levenshtein_table(self, first, second, update_func, add_pred, clear_pr if costs[i][j] > threshold: continue if len(backtraces[i][j]) == 0 and i + j > 0: - continue # не нашлось обратных ссылок - for j_right in range((j if i_right > i else j + 1), - min(j + max_low_length, n) + 1): - low = second[j: j_right] + continue # no backlinks found + for j_right in range( + (j if i_right > i else j + 1), min(j + max_low_length, n) + 1 + ): + low = second[j:j_right] curr_cost = up_costs.get(low, np.inf) old_cost = costs[i_right][j_right] new_cost = costs[i][j] + curr_cost @@ -560,8 +592,8 @@ def _fill_levenshtein_table(self, first, second, update_func, add_pred, clear_pr def _make_reversed_operation_costs(self): """ - Заполняет массив _reversed_operation_costs - на основе имеющегося массива operation_costs + Populates the _reversed_operation_costs array + based on the existing operation_costs array """ _reversed_operation_costs = dict() for up, costs in self.operation_costs.items(): @@ -573,48 +605,55 @@ def _make_reversed_operation_costs(self): def _make_maximal_key_lengths(self): """ - Вычисляет максимальную длину элемента low - в элементарной трансдукции (up, low) для каждого up - и максимальную длину элемента up - в элементарной трансдукции (up, low) для каждого low + Calculates the maximum length of the element ``low`` + in the elementary transduction (up, low) for each ``up`` + and the maximum length of the ``up`` element + in the elementary transduction (up, low) for each ``low`` """ - self.max_up_length = \ - (max(len(up) for up in self.operation_costs) - if len(self.operation_costs) > 0 else -1) - self.max_low_length = \ - (max(len(low) for low in self._reversed_operation_costs) - if len(self._reversed_operation_costs) > 0 else -1) + self.max_up_length = ( + max(len(up) for up in self.operation_costs) + if len(self.operation_costs) > 0 + else -1 + ) + self.max_low_length = ( + max(len(low) for low in self._reversed_operation_costs) + if len(self._reversed_operation_costs) > 0 + else -1 + ) self.max_low_lengths_by_up, self.max_up_lengths_by_low = dict(), dict() for up, costs in self.operation_costs.items(): - self.max_low_lengths_by_up[up] = \ + self.max_low_lengths_by_up[up] = ( max(len(low) for low in costs) if len(costs) > 0 else -1 + ) for low, costs in self._reversed_operation_costs.items(): - self.max_up_lengths_by_low[low] = \ + self.max_up_lengths_by_low[low] = ( max(len(up) for up in costs) if len(costs) > 0 else -1 + ) - def _backtraces_to_transductions(self, first, second, backtraces, threshold, return_cost=False): + def _backtraces_to_transductions( + self, first, second, backtraces, threshold, return_cost=False + ): """ - Восстанавливает трансдукции по таблице обратных ссылок + Restores transductions from the backlink table - Аргументы: + Arguments: ---------- first, second : string - верхние и нижние элементы трансдукции + upper and lower transduction elements backtraces : array-like, dtype=list, shape=(len(first)+1, len(second)+1) - таблица обратных ссылок + backlink table threshold : float - порог для отсева трансдукций, - возвращаются только трансдукции стоимостью <= threshold + only transductions where ``cost <= threshold`` are returned return_cost : bool (optional, default=False) - если True, то вместе с трансдукциями возвращается их стоимость + if True, then their cost is returned along with the transductions - Возвращает: + Returns: ----------- result : list - список вида [(трансдукция, стоимость)], если return_cost=True - и вида [трансдукция], если return_cost=False, - содержащий все трансдукции, переводящие first в second, - чья стоимость не превышает threshold + a list of the form [(transduction, cost)] if return_cost=True + and of the form [transduction] if return_cost=False, + containing all the transductions that translate ``first`` to ``second``, + whose value does not exceed ``threshold`` """ m, n = len(first), len(second) agenda = [None] * (m + 1) @@ -631,7 +670,7 @@ def _backtraces_to_transductions(self, first, second, backtraces, threshold, ret add_cost = self.operation_costs[up][low] for elem, cost in current_agenda: new_cost = cost + add_cost - if new_cost <= threshold: # удаление трансдукций большой стоимости + if new_cost <= threshold: # remove high cost transductions agenda[i][j].append((((up, low),) + elem, new_cost)) if return_cost: return agenda[0][0] @@ -640,20 +679,20 @@ def _backtraces_to_transductions(self, first, second, backtraces, threshold, ret def _perform_insertions(self, initial, max_cost): """ - возвращает все трансдукции стоимости <= max_cost, - которые можно получить из элементов initial + Returns all transductions where ``value <= max_cost``, + which can be obtained from the initial elements - Аргументы: + Arguments: ---------- initial : list of tuples - список исходных трансдукций вида [(трансдукция, стоимость)] - max_cost : float - максимальная стоимость трансдукции + list of initial transductions of the form [(transduction, cost)] + max_const : float + maximum cost of transduction - Возвращает: + Returns: ----------- final : list of tuples - финальный список трансдукций вида [(трансдукция, стоимость)] + the final list of transductions of the form [(transduction, cost)] """ queue = list(initial) final = initial @@ -670,10 +709,10 @@ def _perform_insertions(self, initial, max_cost): def _make_default_operation_costs(self, allow_spaces=False): """ - sets 1.0 cost for every replacement, insertion, deletion and transposition + Sets 1.0 cost for every replacement, insertion, deletion and transposition """ self.operation_costs = dict() - self.operation_costs[""] = {c: 1.0 for c in list(self.alphabet) + [' ']} + self.operation_costs[""] = {c: 1.0 for c in list(self.alphabet) + [" "]} for a in self.alphabet: current_costs = {c: 1.0 for c in self.alphabet} current_costs[a] = 0.0 @@ -681,10 +720,10 @@ def _make_default_operation_costs(self, allow_spaces=False): if allow_spaces: current_costs[" "] = 1.0 self.operation_costs[a] = current_costs - # транспозиции + # transpositions for a, b in itertools.permutations(self.alphabet, 2): self.operation_costs[a + b] = {b + a: 1.0} - # пробелы + # spaces if allow_spaces: self.operation_costs[" "] = {c: 1.0 for c in self.alphabet} self.operation_costs[" "][""] = 1.0 diff --git a/deeppavlov/models/spelling_correction/levenshtein/searcher_component.py b/deeppavlov/models/spelling_correction/levenshtein/searcher_component.py index 93e2f631b9..94a564cbcb 100644 --- a/deeppavlov/models/spelling_correction/levenshtein/searcher_component.py +++ b/deeppavlov/models/spelling_correction/levenshtein/searcher_component.py @@ -23,7 +23,7 @@ logger = getLogger(__name__) -@register('spelling_levenshtein') +@register("spelling_levenshtein") class LevenshteinSearcherComponent(Component): """Component that finds replacement candidates for tokens at a set Damerau-Levenshtein distance @@ -42,14 +42,24 @@ class LevenshteinSearcherComponent(Component): _punctuation = frozenset(string.punctuation) - def __init__(self, words: Iterable[str], max_distance: int = 1, error_probability: float = 1e-4, - vocab_penalty: Optional[float] = None, **kwargs): - words = list({word.strip().lower().replace('ё', 'е') for word in words}) + def __init__( + self, + words: Iterable[str], + max_distance: int = 1, + error_probability: float = 1e-4, + vocab_penalty: Optional[float] = None, + **kwargs + ): + words = list({word.strip().lower().replace("ё", "е") for word in words}) alphabet = sorted({letter for word in words for letter in word}) self.max_distance = max_distance self.error_probability = log10(error_probability) - self.vocab_penalty = self.error_probability if vocab_penalty is None else log10(vocab_penalty) - self.searcher = LevenshteinSearcher(alphabet, words, allow_spaces=True, euristics=2) + self.vocab_penalty = ( + self.error_probability if vocab_penalty is None else log10(vocab_penalty) + ) + self.searcher = LevenshteinSearcher( + alphabet, words, allow_spaces=True, heuristics=2 + ) def _infer_instance(self, tokens: Iterable[str]) -> List[List[Tuple[float, str]]]: candidates = [] @@ -57,13 +67,21 @@ def _infer_instance(self, tokens: Iterable[str]) -> List[List[Tuple[float, str]] if word in self._punctuation: candidates.append([(0, word)]) else: - c = {candidate: self.error_probability * distance - for candidate, distance in self.searcher.search(word, d=self.max_distance)} + c = { + candidate: self.error_probability * distance + for candidate, distance in self.searcher.search( + word, d=self.max_distance + ) + } c[word] = c.get(word, self.vocab_penalty) - candidates.append([(score, candidate) for candidate, score in c.items()]) + candidates.append( + [(score, candidate) for candidate, score in c.items()] + ) return candidates - def __call__(self, batch: Iterable[Iterable[str]], *args, **kwargs) -> List[List[List[Tuple[float, str]]]]: + def __call__( + self, batch: Iterable[Iterable[str]], *args, **kwargs + ) -> List[List[List[Tuple[float, str]]]]: """Propose candidates for tokens in sentences Args: diff --git a/deeppavlov/models/spelling_correction/levenshtein/tabled_trie.py b/deeppavlov/models/spelling_correction/levenshtein/tabled_trie.py index cb24d12245..81782fd0ce 100644 --- a/deeppavlov/models/spelling_correction/levenshtein/tabled_trie.py +++ b/deeppavlov/models/spelling_correction/levenshtein/tabled_trie.py @@ -20,33 +20,56 @@ class Trie: """ - Реализация префиксного бора (точнее, корневого направленного ациклического графа) + Implements directed acyclic graph - Атрибуты + Attributes -------- - alphabet: list, алфавит - alphabet_codes: dict, словарь символ:код - compressed: bool, индикатор сжатия - cashed: bool, индикатор кэширования запросов к функции descend - root: int, индекс корня - graph: array, type=int, shape=(число вершин, размер алфавита), матрица потомков - graph[i][j] = k <-> вершина k --- потомок вершины i по ребру, помеченному символом alphabet[j] - data: array, type=object, shape=(число вершин), массив с данными, хранящямися в вершинах - final: array, type=bool, shape=(число вершин), массив индикаторов - final[i] = True <-> i --- финальная вершина + alphabet: list + + alphabet_codes: dict with alphabet symbols as keys and their codes as values + + root: int, root index + + graph: array, type=int, shape=(n vertices, alphabet size), matrix of descendants + + graph[i][j] = k <-> vertex k --- descendant of vertex i connected by edge indicated by alphabet[j] + + data: array, type=object, shape=(n vertices), array of data stored in vertices + + final: array, type=bool, shape=(n vertices), array of indicators + + final[i] = True <-> i --- final vertex + """ + NO_NODE = -1 SPACE_CODE = -1 - ATTRS = ['is_numpied', 'precompute_symbols', 'allow_spaces', - 'is_terminated', 'to_make_cashed'] - - def __init__(self, alphabet, make_sorted=True, make_alphabet_codes=True, - is_numpied=False, to_make_cashed=False, - precompute_symbols=None, allow_spaces=False, dict_storage=False): + ATTRS = [ + "is_numpied", + "precompute_symbols", + "allow_spaces", + "is_terminated", + "to_make_cashed", + ] + + def __init__( + self, + alphabet, + make_sorted=True, + make_alphabet_codes=True, + is_numpied=False, + to_make_cashed=False, + precompute_symbols=None, + allow_spaces=False, + dict_storage=False, + ): self.alphabet = sorted(alphabet) if make_sorted else alphabet - self.alphabet_codes = ({a: i for i, a in enumerate(self.alphabet)} - if make_alphabet_codes else self.alphabet) + self.alphabet_codes = ( + {a: i for i, a in enumerate(self.alphabet)} + if make_alphabet_codes + else self.alphabet + ) self.alphabet_codes[" "] = Trie.SPACE_CODE self.is_numpied = is_numpied self.to_make_cashed = to_make_cashed @@ -67,40 +90,51 @@ def _make_default_node(self): if self.dict_storage: return defaultdict(lambda: -1) elif self.is_numpied: - return np.full(shape=(len(self.alphabet),), - fill_value=Trie.NO_NODE, dtype=int) + return np.full( + shape=(len(self.alphabet),), fill_value=Trie.NO_NODE, dtype=int + ) else: return [Trie.NO_NODE] * len(self.alphabet) def save(self, outfile): """ - Сохраняет дерево для дальнейшего использования + Dumps trie to outfile """ with open(outfile, "w", encoding="utf8") as fout: attr_values = [getattr(self, attr) for attr in Trie.ATTRS] attr_values.append(any(x is not None for x in self.data)) - fout.write("{}\n{}\t{}\n".format( - " ".join("T" if x else "F" for x in attr_values), - self.nodes_number, self.root)) + fout.write( + "{}\n{}\t{}\n".format( + " ".join("T" if x else "F" for x in attr_values), + self.nodes_number, + self.root, + ) + ) fout.write(" ".join(str(a) for a in self.alphabet) + "\n") for index, label in enumerate(self.final): letters = self._get_letters(index, return_indexes=True) children = self._get_children(index) - fout.write("{}\t{}\n".format( - "T" if label else "F", " ".join("{}:{}".format(*elem) - for elem in zip(letters, children)))) + fout.write( + "{}\t{}\n".format( + "T" if label else "F", + " ".join( + "{}:{}".format(*elem) for elem in zip(letters, children) + ), + ) + ) if self.precompute_symbols is not None: for elem in self.data: - fout.write(":".join(",".join( - map(str, symbols)) for symbols in elem) + "\n") + fout.write( + ":".join(",".join(map(str, symbols)) for symbols in elem) + "\n" + ) return - def make_cashed(self): + def make_cached(self): """ - Включает кэширование запросов к descend + Enables descend caching """ - self._descendance_cash = [dict() for _ in self.graph] - self.descend = self._descend_cashed + self._descendance_cache = [dict() for _ in self.graph] + self.descend = self._descend_cached def make_numpied(self): self.graph = np.array(self.graph) @@ -109,7 +143,7 @@ def make_numpied(self): def add(self, s): """ - Добавление строки s в префиксный бор + Adds string ``s`` to trie """ if self.is_terminated: raise TypeError("Impossible to add string to fitted trie") @@ -138,10 +172,11 @@ def terminate(self): self.make_numpied() self.terminated = True if self.precompute_symbols is not None: - precompute_future_symbols(self, self.precompute_symbols, - allow_spaces=self.allow_spaces) + precompute_future_symbols( + self, self.precompute_symbols, allow_spaces=self.allow_spaces + ) if self.to_make_cashed: - self.make_cashed() + self.make_cached() def __contains__(self, s): if any(a not in self.alphabet for a in s): @@ -152,7 +187,7 @@ def __contains__(self, s): def words(self): """ - Возвращает итератор по словам, содержащимся в боре + Yields trie words """ branch, word, indexes = [self.root], [], [0] letters_with_children = [self._get_children_and_letters(self.root)] @@ -175,20 +210,14 @@ def words(self): def is_final(self, index): """ - Аргументы - --------- - index: int, номер вершины - - Возвращает - ---------- - True: если index --- номер финальной вершины + Checks if the vertex is final """ return self.final[index] def find_partitions(self, s, max_count=1): """ - Находит все разбиения s = s_1 ... s_m на словарные слова s_1, ..., s_m - для m <= max_count + Finds all partitions s = s_1 ... s_m with words s_1, ..., s_m + where m <= max_count """ curr_agenda = [(self.root, [], 0)] for i, a in enumerate(s): @@ -208,7 +237,9 @@ def find_partitions(self, s, max_count=1): for curr, borders, cost in curr_agenda: if curr == self.root: borders = [0] + borders - answer.append([s[left:borders[i + 1]] for i, left in enumerate(borders[:-1])]) + answer.append( + [s[left: borders[i + 1]] for i, left in enumerate(borders[:-1])] + ) return answer def __len__(self): @@ -225,7 +256,9 @@ def __repr__(self): answer += " {0}:{1}".format(a, index) answer += "\n" if data is not None: - answer += "data:{0} {1}\n".format(len(data), " ".join(str(elem) for elem in data)) + answer += "data:{0} {1}\n".format( + len(data), " ".join(str(elem) for elem in data) + ) return answer def _add_descendant(self, parent, s, final=False): @@ -236,18 +269,18 @@ def _add_descendant(self, parent, s, final=False): def _add_empty_child(self, parent, code, final=False): """ - Добавление ребёнка к вершине parent по символу с кодом code + Adds a child to ``parent[code]`` """ self.graph[parent][code] = self.nodes_number self.graph.append(self._make_default_node()) self.data.append(None) self.final.append(final) self.nodes_number += 1 - return (self.nodes_number - 1) + return self.nodes_number - 1 def _descend_simple(self, curr, s): """ - Спуск из вершины curr по строке s + Descend from vertex ``curr`` using ``s`` symbols as path """ for a in s: curr = self.graph[curr][self.alphabet_codes[a]] @@ -255,17 +288,17 @@ def _descend_simple(self, curr, s): break return curr - def _descend_cashed(self, curr, s): + def _descend_cached(self, curr, s): """ - Спуск из вершины curr по строке s с кэшированием + Descend from vertex ``curr`` using ``s`` symbols as path using cache """ if s == "": return curr - curr_cash = self._descendance_cash[curr] + curr_cash = self._descendance_cache[curr] answer = curr_cash.get(s, None) if answer is not None: return answer - # для оптимизации дублируем код + # duplicate code for optimization res = curr for a in s: res = self.graph[res][self.alphabet_codes[a]] @@ -277,19 +310,20 @@ def _descend_cashed(self, curr, s): def _set_final(self, curr): """ - Делает состояние curr завершающим + Sets state ``curr`` as final """ self.final[curr] = True def _get_letters(self, index, return_indexes=False): """ - Извлекает все метки выходных рёбер вершины с номером index + Returns all letters of edges adjacent to ``index`` vertex """ if self.dict_storage: answer = list(self.graph[index].keys()) else: - answer = [i for i, elem in enumerate(self.graph[index]) - if elem != Trie.NO_NODE] + answer = [ + i for i, elem in enumerate(self.graph[index]) if elem != Trie.NO_NODE + ] if not return_indexes: answer = [(self.alphabet[i] if i >= 0 else " ") for i in answer] return answer @@ -298,8 +332,9 @@ def _get_children_and_letters(self, index, return_indexes=False): if self.dict_storage: answer = list(self.graph[index].items()) else: - answer = [elem for elem in enumerate(self.graph[index]) - if elem[1] != Trie.NO_NODE] + answer = [ + elem for elem in enumerate(self.graph[index]) if elem[1] != Trie.NO_NODE + ] if not return_indexes: for i, (letter_index, child) in enumerate(answer): answer[i] = (self.alphabet[letter_index], child) @@ -307,7 +342,7 @@ def _get_children_and_letters(self, index, return_indexes=False): def _get_children(self, index): """ - Извлекает всех потомков вершины с номером index + Returns all children of ``index`` vertex """ if self.dict_storage: return list(self.graph[index].values()) @@ -319,8 +354,16 @@ class TrieMinimizer: def __init__(self): pass - def minimize(self, trie, dict_storage=False, make_cashed=False, make_numpied=False, - precompute_symbols=None, allow_spaces=False, return_groups=False): + def minimize( + self, + trie, + dict_storage=False, + make_cashed=False, + make_numpied=False, + precompute_symbols=None, + allow_spaces=False, + return_groups=False, + ): N = len(trie) if N == 0: raise ValueError("Trie should be non-empty") @@ -342,27 +385,33 @@ def minimize(self, trie, dict_storage=False, make_cashed=False, make_numpied=Fal if key_class is not None: node_classes[index] = key_class else: - # появился новый класс + # new class class_keys.append(key) classes[key] = node_classes[index] = curr_index class_representatives.append(curr_index) curr_index += 1 - # построение нового дерева - compressed = Trie(trie.alphabet, is_numpied=make_numpied, - dict_storage=dict_storage, allow_spaces=allow_spaces, - precompute_symbols=precompute_symbols) + # build a new trie + compressed = Trie( + trie.alphabet, + is_numpied=make_numpied, + dict_storage=dict_storage, + allow_spaces=allow_spaces, + precompute_symbols=precompute_symbols, + ) L = len(classes) new_final = [elem[2] for elem in class_keys[::-1]] if dict_storage: new_graph = [defaultdict(int) for _ in range(L)] elif make_numpied: - new_graph = np.full(shape=(L, len(trie.alphabet)), - fill_value=Trie.NO_NODE, dtype=int) + new_graph = np.full( + shape=(L, len(trie.alphabet)), fill_value=Trie.NO_NODE, dtype=int + ) new_final = np.array(new_final, dtype=bool) else: new_graph = [[Trie.NO_NODE for a in trie.alphabet] for i in range(L)] - for (indexes, children, final), class_index in \ - sorted(classes.items(), key=(lambda x: x[1])): + for (indexes, children, final), class_index in sorted( + classes.items(), key=(lambda x: x[1]) + ): row = new_graph[L - class_index - 1] for i, child_index in zip(indexes, children): row[i] = L - child_index - 1 @@ -372,14 +421,17 @@ def minimize(self, trie, dict_storage=False, make_cashed=False, make_numpied=Fal compressed.nodes_number = L compressed.data = [None] * L if make_cashed: - compressed.make_cashed() + compressed.make_cached() if precompute_symbols is not None: - if (trie.is_terminated and trie.precompute_symbols - and trie.allow_spaces == allow_spaces): - # копируем будущие символы из исходного дерева - # нужно, чтобы возврат из финальных состояний в начальное был одинаковым в обоих деревьях + if ( + trie.is_terminated + and trie.precompute_symbols + and trie.allow_spaces == allow_spaces + ): + # copy future symbols from the original tree + # we have to make sure returning from the last to the first state is the same for both trees for i, node_index in enumerate(class_representatives[::-1]): - # будущие символы для представителя i-го класса + # future symbols for i-th class compressed.data[i] = copy.copy(trie.data[node_index]) else: precompute_future_symbols(compressed, precompute_symbols, allow_spaces) @@ -391,23 +443,23 @@ def minimize(self, trie, dict_storage=False, make_cashed=False, make_numpied=Fal def generate_postorder(self, trie): """ - Обратная топологическая сортировка + Reversed topological sort """ order, stack = [], [] stack.append(trie.root) - colors = ['white'] * len(trie) + colors = ["white"] * len(trie) while len(stack) > 0: index = stack[-1] color = colors[index] - if color == 'white': # вершина ещё не обрабатывалась - colors[index] = 'grey' + if color == "white": # the vertex hasn't been processed yet + colors[index] = "grey" for child in trie._get_children(index): - # проверяем, посещали ли мы ребёнка раньше - if child != Trie.NO_NODE and colors[child] == 'white': + # check if we have visited the child already + if child != Trie.NO_NODE and colors[child] == "white": stack.append(child) else: - if color == 'grey': - colors[index] = 'black' + if color == "grey": + colors[index] = "black" order.append(index) stack = stack[:-1] return order @@ -416,7 +468,7 @@ def generate_postorder(self, trie): def load_trie(infile): with open(infile, "r", encoding="utf8") as fin: line = fin.readline().strip() - flags = [x == 'T' for x in line.split()] + flags = [x == "T" for x in line.split()] if len(flags) != len(Trie.ATTRS) + 1: raise ValueError("Wrong file format") nodes_number, root = map(int, fin.readline().strip().split()) @@ -431,18 +483,19 @@ def load_trie(infile): graph = [defaultdict(lambda: -1) for _ in range(nodes_number)] elif trie.is_numpied: final = np.array(final) - graph = np.full(shape=(nodes_number, len(alphabet)), - fill_value=Trie.NO_NODE, dtype=int) + graph = np.full( + shape=(nodes_number, len(alphabet)), fill_value=Trie.NO_NODE, dtype=int + ) else: graph = [[Trie.NO_NODE for a in alphabet] for i in range(nodes_number)] for i in range(nodes_number): line = fin.readline().strip() if "\t" in line: label, transitions = line.split("\t") - final[i] = (label == "T") + final[i] = label == "T" else: label = line - final[i] = (label == "T") + final[i] = label == "T" continue transitions = [x.split(":") for x in transitions.split()] for code, value in transitions: @@ -457,21 +510,38 @@ def load_trie(infile): line = fin.readline().strip("\n") trie.data[i] = [set(elem.split(",")) for elem in line.split(":")] if trie.to_make_cashed: - trie.make_cashed() + trie.make_cached() return trie -def make_trie(alphabet, words, compressed=True, is_numpied=False, - make_cashed=False, precompute_symbols=False, - allow_spaces=False, dict_storage=False): - trie = Trie(alphabet, is_numpied=is_numpied, to_make_cashed=make_cashed, - precompute_symbols=precompute_symbols, dict_storage=dict_storage) +def make_trie( + alphabet, + words, + compressed=True, + is_numpied=False, + make_cashed=False, + precompute_symbols=False, + allow_spaces=False, + dict_storage=False, +): + trie = Trie( + alphabet, + is_numpied=is_numpied, + to_make_cashed=make_cashed, + precompute_symbols=precompute_symbols, + dict_storage=dict_storage, + ) trie.fit(words) if compressed: tm = TrieMinimizer() - trie = tm.minimize(trie, dict_storage=dict_storage, make_cashed=make_cashed, - make_numpied=is_numpied, precompute_symbols=precompute_symbols, - allow_spaces=allow_spaces) + trie = tm.minimize( + trie, + dict_storage=dict_storage, + make_cashed=make_cashed, + make_numpied=is_numpied, + precompute_symbols=precompute_symbols, + allow_spaces=allow_spaces, + ) return trie @@ -482,7 +552,7 @@ def precompute_future_symbols(trie, n, allow_spaces=False): if n == 0: return if trie.is_terminated and trie.precompute_symbols: - # символы уже предпосчитаны + # symbols have been precalculated return for index, final in enumerate(trie.final): trie.data[index] = [set() for i in range(n)] @@ -495,7 +565,6 @@ def precompute_future_symbols(trie, n, allow_spaces=False): children = set(trie._get_children(index)) for child in children: node_data[d] |= trie.data[child][d - 1] - # в случае, если разрешён возврат по пробелу в стартовое состояние if allow_spaces and final: node_data[d] |= trie.data[trie.root][d - 1] trie.terminated = True