From 6b86171c9f2b4b01d63c820cb8767bf62200cf92 Mon Sep 17 00:00:00 2001 From: Efim Kubishkin Date: Wed, 15 May 2024 19:02:41 +0300 Subject: [PATCH 1/6] Add autotests for task 12 --- tasks/task12.md | 11 ++ tests/autotests/test_task11.py | 1 - tests/autotests/test_task12.py | 32 ++++ tests/autotests/to_program_parser.py | 244 +++++++++++++++++++++++++++ 4 files changed, 287 insertions(+), 1 deletion(-) create mode 100644 tests/autotests/test_task12.py create mode 100644 tests/autotests/to_program_parser.py diff --git a/tasks/task12.md b/tasks/task12.md index 9fc494c67..8ec2a12d5 100644 --- a/tasks/task12.md +++ b/tasks/task12.md @@ -19,3 +19,14 @@ - Проследите за адекватностью сообщений об ошибках. Вам же проще отлаживаться будет. - Постарайтесь максимально использовать возможности ANTLR по работе с деревом разбора. - [ ] Добавить необходимые тесты. + +Требуемые функции: + +```python +# возвращает пару: корректна ли программа и сообщение об ошибке (None, если корректна) +def typing_program(program: str) -> bool: + pass + +def exec_program(program: str) -> dict[str, set[tuple]]: + pass +``` diff --git a/tests/autotests/test_task11.py b/tests/autotests/test_task11.py index e9c397c95..d7a97d3f8 100644 --- a/tests/autotests/test_task11.py +++ b/tests/autotests/test_task11.py @@ -2,7 +2,6 @@ # You MUST NOT touch anything here except ONE block below # You CAN modify this file IF AND ONLY IF you have found a bug and are willing to fix it # Otherwise, please report it -import inspect import io from contextlib import redirect_stdout diff --git a/tests/autotests/test_task12.py b/tests/autotests/test_task12.py new file mode 100644 index 000000000..dcb1ab8c1 --- /dev/null +++ b/tests/autotests/test_task12.py @@ -0,0 +1,32 @@ +import pytest +from to_program_parser import WELL_TYPED, ILL_TYPED, Program +from fixtures import graph +from grammars_constants import GRAMMARS_DIFFERENT +from networkx import MultiDiGraph +from pyformlang.cfg import CFG +from helper import generate_rnd_start_and_final + +try: + from project.task7 import cfpq_with_matrix + from project.task12 import typing_program, exec_program +except ImportError: + pytestmark = pytest.mark.skip("Task 12 is not ready to test!") + + +class TestTypeInference: + @pytest.mark.parametrize("program", WELL_TYPED) + def test_well_typed(self, program: str) -> None: + assert typing_program(program) + + @pytest.mark.parametrize("program", ILL_TYPED) + def test_ill_typed(self, program: str) -> None: + assert not typing_program(program) + + @pytest.mark.parametrize("grammar", GRAMMARS_DIFFERENT) + def test_exec_simple(self, graph: MultiDiGraph, grammar: CFG): + start_nodes, final_nodes = generate_rnd_start_and_final(graph) + program = Program(graph, grammar, start_nodes, final_nodes) + assert typing_program(program) + cfpq_from_prog = exec_program(str(program))[program.result_name] + cfpq_from_algo = cfpq_with_matrix(grammar, graph, start_nodes, final_nodes) + assert cfpq_from_prog == cfpq_from_algo diff --git a/tests/autotests/to_program_parser.py b/tests/autotests/to_program_parser.py new file mode 100644 index 000000000..cde11c722 --- /dev/null +++ b/tests/autotests/to_program_parser.py @@ -0,0 +1,244 @@ +from copy import copy +import functools +import itertools + +import pyformlang as pl +from pyformlang.cfg import Variable, Terminal +import networkx as nx + +LABEL = "label" + + +class FreshVar: + var_counter = 0 + + @classmethod + def generate_fresh(cls, var: str) -> str: + cls.var_counter += 1 + return f"{var}{cls.var_counter}" + + +class Program: + EPS = '"a"^[0]' + nonterminal_names = {} + result_name = "" + + @staticmethod + def _nonterminal_to_string(nonterminal: Variable) -> str: + return nonterminal.to_text().lower() + + @staticmethod + def _terminal_to_string(terminal: Terminal) -> str: + terminal_s = terminal.to_text().lower() + if len(terminal_s) == 1: + return f'"{terminal_s}"' + res = "" + for key, group in itertools.groupby(terminal_s): + dot = " . " if res != "" else "" + res += f'{dot}"{key}"^[{len(group)}]' + return res + + def __init__( + self, + graph: nx.MultiDiGraph, + cfg: pl.cfg.CFG, + start_nodes: set[int], + final_nodes=None, + ): + self.graph = graph.copy() + self.graph.name = ( + FreshVar.generate_fresh(graph.name) + if graph.name != "" + else FreshVar.generate_fresh("g") + ) + self.grammar = copy(cfg) + self.start_nodes = start_nodes + self.final_nodes = final_nodes + self.result_name = FreshVar.generate_fresh("r") + for production in cfg.productions: + if production.head not in self.nonterminal_names.keys(): + self.nonterminal_names[production.head] = FreshVar.generate_fresh( + self._nonterminal_to_string(production.head) + ) + + def _graph_to_program(self) -> str: + program = f"let {self.graph.name} is graph" + for node_from, node_to, data in self.graph.edges(data=True): + program += f'\nadd edge ({node_from}, "{data[LABEL]}", {node_to}) to {self.graph.name}' + return program + + def _object_to_string(self, cfg_object: Variable | Terminal) -> str: + if isinstance(cfg_object, Variable): + return self.nonterminal_names[cfg_object] + return self._terminal_to_string(cfg_object) + + def _objs_to_expr(self, objects: list[pl.cfg.production.CFGObject]) -> str: + if len(objects) == 0: + return self.EPS + return functools.reduce( + lambda acc, obj: f"{acc}{' . ' if acc != '' else ''}{self._object_to_string(obj)}", + objects, + "", + ) + + def _objs_alts(self, objects: list[list[pl.cfg.production.CFGObject]]) -> str: + return functools.reduce( + lambda acc, objs: f"{acc}{' | ' if acc != '' else ''}{self._objs_to_expr(objs)}", + objects, + "", + ) + + def _cfg_to_program(self) -> str: + res = "" + vars_dict: dict[pl.cfg.Variable, list[list[pl.cfg.production.CFGObject]]] = {} + for production in self.grammar.productions: + head = production.head + body = production.body + if head in vars_dict.keys(): + vars_dict[head].append(body) + else: + vars_dict[head] = [body] + for nonterminal in vars_dict.keys(): + res += f"\nlet {self.nonterminal_names[nonterminal]} = {self._objs_alts(vars_dict[nonterminal])}" + return res + + def _query_to_program(self) -> str: + query_name = self.nonterminal_names[self.grammar.start_symbol] + start_set_expr = ( + "[" + + functools.reduce( + lambda acc, x: f"{acc}{', ' if acc != '' else ''}{str(x)}", + self.start_nodes, + "", + ) + + "]" + ) + if len(self.final_nodes) == 0: + return f"\nlet {self.result_name} = for v in {start_set_expr} return u, v where u reachable from v in {self.graph.name} by {query_name}" + final_set_expr = ( + "[" + + functools.reduce( + lambda acc, x: f"{acc}{', ' if acc != '' else ''}{str(x)}", + self.final_nodes, + "", + ) + + "]" + ) + return ( + f"\nlet {self.result_name} = for v in {start_set_expr} for u in {final_set_expr} return u, v where u reachable from v in {self.graph.name} " + f"by {query_name}" + ) + + def __str__(self): + program = "" + program += self._graph_to_program() + cfg_pr = self._cfg_to_program() + program += cfg_pr + program += self._query_to_program() + return program + + +WELL_TYPED = [ + """ + let p = "a" . p . "b" | "c" + let q = ("s" . "f") ^ [1..] + let g = q & p""", + """ + let p = "a" . "b" | "c" + let q = ("s" . "f") ^ [1..] + let g = [q, p] + """, + """ + let p = (1,"a",2) + let g is graph + remove edge p from g + """, + """ + let p = 1 + let g is graph + remove vertex p from g + """, + """ + let p = [1,2] + let g is graph + remove vertices p from g + """, + """ + let p = "a" . p . "b" | "c" + let g is graph + let r1 = + return v + where u reachable from v in g by p + let q = "s" . q . "f" | "e" + let r2 = + for v in r1 + return u,v + where u reachable from v in g by q + """, + """ + let p = "a" . p . "b" | "c" + let g is graph + remove vertices + return v + where u reachable from v in g by p + from g + """, + """ + let p = "a" . p . "b" | "c" + let g is graph + let q = "c" ^ [1..] + let r = + for v in + return v + where u reachable from v in g by p + return u,v + where v reachable from u in g by q + """, +] + +ILL_TYPED = [ + """ + let p = "a" . p . "b" | "c" + let q = "s" . q . "f" | "e" + let g = q & p + """, + """ + let p = "a" . p . "b" | "c" + let q = ("s" . "f") ^ [1..] + let g = [q, p] + """, + """ + let p = "a" . p . "b" | "c" + let g is graph + let r1 = + return u,v + where u reachable from v in g by p + let q = "s" . q . "f" | "e" + let r2 = + for v in r1 + return u,v + where u reachable from v in g by q + """, + """ + let p = "a" . p . "b" | "c" + let g is graph + remove edge p from g + """, + """ + let p = (1,"a",2) + let g is graph + remove vertex p from g + """, + """ + let p = 1 + let g is graph + remove vertices p from g + """, + """ + let p = 1 + let g is graph + let x = + return v + where v reachable from u in g by p + """, +] From 5707dd69b8995edc87541792f44d5ef25e281650 Mon Sep 17 00:00:00 2001 From: Nikolai Ponomarev Date: Sat, 25 May 2024 14:47:45 +0300 Subject: [PATCH 2/6] Skip execution tests & change examples --- tests/autotests/test_task12.py | 1 + tests/autotests/to_program_parser.py | 15 ++++++++++++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/tests/autotests/test_task12.py b/tests/autotests/test_task12.py index dcb1ab8c1..8772ce508 100644 --- a/tests/autotests/test_task12.py +++ b/tests/autotests/test_task12.py @@ -22,6 +22,7 @@ def test_well_typed(self, program: str) -> None: def test_ill_typed(self, program: str) -> None: assert not typing_program(program) + @pytest.mark.skip("Has errors") @pytest.mark.parametrize("grammar", GRAMMARS_DIFFERENT) def test_exec_simple(self, graph: MultiDiGraph, grammar: CFG): start_nodes, final_nodes = generate_rnd_start_and_final(graph) diff --git a/tests/autotests/to_program_parser.py b/tests/autotests/to_program_parser.py index cde11c722..a3ff99cfe 100644 --- a/tests/autotests/to_program_parser.py +++ b/tests/autotests/to_program_parser.py @@ -144,9 +144,13 @@ def __str__(self): let q = ("s" . "f") ^ [1..] let g = q & p""", """ - let p = "a" . "b" | "c" - let q = ("s" . "f") ^ [1..] - let g = [q, p] + let a = ("a" . b) | "a" ^ [0] + let b = a . "b" + """, + """ + let q = "a" . p + let p = "b" . r + let r = ("c" . r) | "c" ^ [0] """, """ let p = (1,"a",2) @@ -208,6 +212,11 @@ def __str__(self): let g = [q, p] """, """ + let p = "a" . "b" | "c" + let q = ("s" . "f") ^ [1..] + let g = [q, p] + """, + """ let p = "a" . p . "b" | "c" let g is graph let r1 = From 7208040f0929f2a7a6f0f0638fe16a7fe05c54e4 Mon Sep 17 00:00:00 2001 From: Efim Kubishkin Date: Tue, 21 May 2024 12:18:34 +0300 Subject: [PATCH 3/6] Rework Program class --- tasks/task12.md | 1 - tests/autotests/to_program_parser.py | 142 +++++++++++++++++++-------- 2 files changed, 99 insertions(+), 44 deletions(-) diff --git a/tasks/task12.md b/tasks/task12.md index 8ec2a12d5..222d022c4 100644 --- a/tasks/task12.md +++ b/tasks/task12.md @@ -23,7 +23,6 @@ Требуемые функции: ```python -# возвращает пару: корректна ли программа и сообщение об ошибке (None, если корректна) def typing_program(program: str) -> bool: pass diff --git a/tests/autotests/to_program_parser.py b/tests/autotests/to_program_parser.py index a3ff99cfe..f5c978f78 100644 --- a/tests/autotests/to_program_parser.py +++ b/tests/autotests/to_program_parser.py @@ -5,8 +5,7 @@ import pyformlang as pl from pyformlang.cfg import Variable, Terminal import networkx as nx - -LABEL = "label" +from constants import LABEL class FreshVar: @@ -18,59 +17,67 @@ def generate_fresh(cls, var: str) -> str: return f"{var}{cls.var_counter}" -class Program: - EPS = '"a"^[0]' - nonterminal_names = {} - result_name = "" +def _nonterminal_to_string(nonterminal: Variable) -> str: + return nonterminal.to_text().lower() - @staticmethod - def _nonterminal_to_string(nonterminal: Variable) -> str: - return nonterminal.to_text().lower() - @staticmethod - def _terminal_to_string(terminal: Terminal) -> str: - terminal_s = terminal.to_text().lower() - if len(terminal_s) == 1: - return f'"{terminal_s}"' - res = "" - for key, group in itertools.groupby(terminal_s): - dot = " . " if res != "" else "" +def _terminal_to_string(terminal: Terminal) -> str: + """ + convert terminal symbol into char + :param terminal: terminal symbol + :return: + """ + terminal_s = terminal.to_text().lower() + if len(terminal_s) == 1: + return f'"{terminal_s}"' + res = "" + for key, group in itertools.groupby(terminal_s): + dot = " . " if res != "" else "" + if len(group) > 1: res += f'{dot}"{key}"^[{len(group)}]' - return res + else: + res += f'{dot}"{key}"' + return res - def __init__( - self, - graph: nx.MultiDiGraph, - cfg: pl.cfg.CFG, - start_nodes: set[int], - final_nodes=None, - ): + +class GraphProgram: + def __init__(self, graph: nx.MultiDiGraph): self.graph = graph.copy() - self.graph.name = ( + self.name = ( FreshVar.generate_fresh(graph.name) if graph.name != "" else FreshVar.generate_fresh("g") ) - self.grammar = copy(cfg) - self.start_nodes = start_nodes - self.final_nodes = final_nodes - self.result_name = FreshVar.generate_fresh("r") - for production in cfg.productions: - if production.head not in self.nonterminal_names.keys(): - self.nonterminal_names[production.head] = FreshVar.generate_fresh( - self._nonterminal_to_string(production.head) - ) - def _graph_to_program(self) -> str: + def __str__(self): program = f"let {self.graph.name} is graph" for node_from, node_to, data in self.graph.edges(data=True): program += f'\nadd edge ({node_from}, "{data[LABEL]}", {node_to}) to {self.graph.name}' return program + +class GrammarProgram: + nonterminal_names = {} + EPS = '"a"^[0]' + + def __init__(self, cfg: pl.cfg.CFG): + self.grammar = copy(cfg) + for production in cfg.productions: + if production.head not in self.nonterminal_names.keys(): + self.nonterminal_names[production.head] = FreshVar.generate_fresh( + _nonterminal_to_string(production.head) + ) + self.start_nonterminal_name = self.nonterminal_names[cfg.start_symbol] + def _object_to_string(self, cfg_object: Variable | Terminal) -> str: + """ + convert nonterminal or terminal symbol into program representation + :param cfg_object: terminal or nonterminal + :return: an object view of "x", if object is terminal or x, if object is nonterminal + """ if isinstance(cfg_object, Variable): return self.nonterminal_names[cfg_object] - return self._terminal_to_string(cfg_object) + return _terminal_to_string(cfg_object) def _objs_to_expr(self, objects: list[pl.cfg.production.CFGObject]) -> str: if len(objects) == 0: @@ -88,7 +95,7 @@ def _objs_alts(self, objects: list[list[pl.cfg.production.CFGObject]]) -> str: "", ) - def _cfg_to_program(self) -> str: + def __str__(self) -> str: res = "" vars_dict: dict[pl.cfg.Variable, list[list[pl.cfg.production.CFGObject]]] = {} for production in self.grammar.productions: @@ -102,8 +109,25 @@ def _cfg_to_program(self) -> str: res += f"\nlet {self.nonterminal_names[nonterminal]} = {self._objs_alts(vars_dict[nonterminal])}" return res - def _query_to_program(self) -> str: - query_name = self.nonterminal_names[self.grammar.start_symbol] + +class QueryProgram: + def __init__( + self, + graph_program: GraphProgram, + grammar_program: GrammarProgram, + start_nodes: set[int], + final_nodes: set[int] = None, + ): + if final_nodes is None: + final_nodes = set() + self.graph_program = graph_program + self.grammar_program = grammar_program + self.result_name = FreshVar.generate_fresh("r") + self.start_nodes = start_nodes + self.final_nodes = final_nodes + + def __str__(self) -> str: + query_name = self.grammar_program.start_nonterminal_name start_set_expr = ( "[" + functools.reduce( @@ -114,7 +138,10 @@ def _query_to_program(self) -> str: + "]" ) if len(self.final_nodes) == 0: - return f"\nlet {self.result_name} = for v in {start_set_expr} return u, v where u reachable from v in {self.graph.name} by {query_name}" + return ( + f"\nlet {self.result_name} = for v in {start_set_expr} return u, v where u reachable from v in " + f"{self.graph_program.name} by {query_name}" + ) final_set_expr = ( "[" + functools.reduce( @@ -125,10 +152,39 @@ def _query_to_program(self) -> str: + "]" ) return ( - f"\nlet {self.result_name} = for v in {start_set_expr} for u in {final_set_expr} return u, v where u reachable from v in {self.graph.name} " - f"by {query_name}" + f"\nlet {self.result_name} = for v in {start_set_expr} for u in {final_set_expr} return u, v where u " + f"reachable from v in {self.graph_program.name} by {query_name}" ) + +class Program: + EPS = '"a"^[0]' + nonterminal_names = {} + result_name = "" + + def __init__( + self, + graph: nx.MultiDiGraph, + cfg: pl.cfg.CFG, + start_nodes: set[int], + final_nodes=None, + ): + self.graph = graph.copy() + self.graph.name = ( + FreshVar.generate_fresh(graph.name) + if graph.name != "" + else FreshVar.generate_fresh("g") + ) + self.grammar = copy(cfg) + self.start_nodes = start_nodes + self.final_nodes = final_nodes + self.result_name = FreshVar.generate_fresh("r") + for production in cfg.productions: + if production.head not in self.nonterminal_names.keys(): + self.nonterminal_names[production.head] = FreshVar.generate_fresh( + _nonterminal_to_string(production.head) + ) + def __str__(self): program = "" program += self._graph_to_program() From 582cefd8fa2c76fbef2438b387b8cfd2982ce9f3 Mon Sep 17 00:00:00 2001 From: Efim Kubishkin Date: Sun, 26 May 2024 18:46:25 +0300 Subject: [PATCH 4/6] Add complex test Add test with multi grammars and graphs. Add some comments and functions into to_program_parser --- tests/autotests/test_task12.py | 61 +++++++++++++++++++++-- tests/autotests/to_program_parser.py | 73 ++++++++++++++-------------- 2 files changed, 93 insertions(+), 41 deletions(-) diff --git a/tests/autotests/test_task12.py b/tests/autotests/test_task12.py index 8772ce508..6ac2eb312 100644 --- a/tests/autotests/test_task12.py +++ b/tests/autotests/test_task12.py @@ -1,10 +1,20 @@ +import random + import pytest -from to_program_parser import WELL_TYPED, ILL_TYPED, Program +from to_program_parser import ( + WELL_TYPED, + ILL_TYPED, + GraphProgram, + GrammarProgram, + QueryProgram, + to_program_parser, +) from fixtures import graph from grammars_constants import GRAMMARS_DIFFERENT from networkx import MultiDiGraph from pyformlang.cfg import CFG -from helper import generate_rnd_start_and_final +from helper import generate_rnd_start_and_final, generate_rnd_dense_graph +from constants import LABELS try: from project.task7 import cfpq_with_matrix @@ -22,12 +32,53 @@ def test_well_typed(self, program: str) -> None: def test_ill_typed(self, program: str) -> None: assert not typing_program(program) - @pytest.mark.skip("Has errors") @pytest.mark.parametrize("grammar", GRAMMARS_DIFFERENT) def test_exec_simple(self, graph: MultiDiGraph, grammar: CFG): start_nodes, final_nodes = generate_rnd_start_and_final(graph) - program = Program(graph, grammar, start_nodes, final_nodes) + graph_prog = GraphProgram(graph) + cfg_prog = GrammarProgram(grammar) + query = QueryProgram(graph_prog, cfg_prog, start_nodes, final_nodes) + program = query.full_program() assert typing_program(program) - cfpq_from_prog = exec_program(str(program))[program.result_name] + cfpq_from_prog = exec_program(program)[query.result_name] cfpq_from_algo = cfpq_with_matrix(grammar, graph, start_nodes, final_nodes) assert cfpq_from_prog == cfpq_from_algo + + # @pytest.mark.parametrize("queries_count", [2, 3, 5]) + # def test_exec_one_graph_many_queries(self, graph, queries_count): + # start_nodes, final_nodes = generate_rnd_start_and_final(graph) + # graph_prog = GraphProgram(graph) + # query_list = [] + # for i in range(queries_count): + # grammar_prog = GrammarProgram(random.choice(GRAMMARS_DIFFERENT)) + # query_list.append(QueryProgram(graph_prog, grammar_prog, start_nodes, final_nodes)) + # program, name_result = to_program_parser(query_list) + # result_dict: dict = exec_program(program) + # for var, res in result_dict.items(): + # query = name_result[var] + # separate_res = exec_program(query.full_program()) + # assert separate_res == res + # assert res == cfpq_with_matrix(query.get_grammar(), query.get_graph(), query.start_nodes, query.final_nodes) + @pytest.mark.parametrize("queries_count", [1, 3, 5]) + def test_exec_many_graphs_many_queries(self, queries_count): + query_list = [] + for i in range(queries_count): + graph = generate_rnd_dense_graph(1, 40, LABELS) + grammar_prog = GrammarProgram(random.choice(GRAMMARS_DIFFERENT)) + graph_prog = GraphProgram(graph) + start_nodes, final_nodes = generate_rnd_start_and_final(graph) + query_list.append( + QueryProgram(graph_prog, grammar_prog, start_nodes, final_nodes) + ) + program, name_result = to_program_parser(query_list) + result_dict: dict = exec_program(program) + for var, res in result_dict.items(): + query = name_result[var] + separate_res = exec_program(query.full_program()) + assert separate_res == res + assert res == cfpq_with_matrix( + query.get_grammar(), + query.get_graph(), + query.start_nodes, + query.final_nodes, + ) diff --git a/tests/autotests/to_program_parser.py b/tests/autotests/to_program_parser.py index f5c978f78..ded9df86f 100644 --- a/tests/autotests/to_program_parser.py +++ b/tests/autotests/to_program_parser.py @@ -50,7 +50,7 @@ def __init__(self, graph: nx.MultiDiGraph): ) def __str__(self): - program = f"let {self.graph.name} is graph" + program = f"\nlet {self.graph.name} is graph" for node_from, node_to, data in self.graph.edges(data=True): program += f'\nadd edge ({node_from}, "{data[LABEL]}", {node_to}) to {self.graph.name}' return program @@ -126,7 +126,17 @@ def __init__( self.start_nodes = start_nodes self.final_nodes = final_nodes - def __str__(self) -> str: + def get_graph(self): + return self.graph_program.graph + + def get_grammar(self): + return self.grammar_program.grammar + + def query_program(self) -> str: + """ + if you want only want to get query + :return: just select expression + """ query_name = self.grammar_program.start_nonterminal_name start_set_expr = ( "[" @@ -156,42 +166,33 @@ def __str__(self) -> str: f"reachable from v in {self.graph_program.name} by {query_name}" ) + def full_program(self) -> str: + """ + if you want to work with query as meaningful object + :return: fully query with predefined graph and grammar + """ + return f"\n{self.graph_program}\n{self.grammar_program}\n{self.query_program()}" -class Program: - EPS = '"a"^[0]' - nonterminal_names = {} - result_name = "" - - def __init__( - self, - graph: nx.MultiDiGraph, - cfg: pl.cfg.CFG, - start_nodes: set[int], - final_nodes=None, - ): - self.graph = graph.copy() - self.graph.name = ( - FreshVar.generate_fresh(graph.name) - if graph.name != "" - else FreshVar.generate_fresh("g") - ) - self.grammar = copy(cfg) - self.start_nodes = start_nodes - self.final_nodes = final_nodes - self.result_name = FreshVar.generate_fresh("r") - for production in cfg.productions: - if production.head not in self.nonterminal_names.keys(): - self.nonterminal_names[production.head] = FreshVar.generate_fresh( - _nonterminal_to_string(production.head) - ) - def __str__(self): - program = "" - program += self._graph_to_program() - cfg_pr = self._cfg_to_program() - program += cfg_pr - program += self._query_to_program() - return program +def to_program_parser( + query_list: list[QueryProgram], +) -> tuple[str, dict[str, QueryProgram]]: + result_program = "" + res_name_query = {} + grammar_set = set() + graph_set = set() + for query in query_list: + # if graph is already defined then it is not necessary to define it again + if query.graph_program not in graph_set: + result_program += str(query.graph_program) + graph_set.add(query.graph_program) + # same with grammar + if query.grammar_program not in grammar_set: + result_program += str(query.grammar_program) + grammar_set.add(query.grammar_program) + result_program += query.query_program() + res_name_query.update({query.result_name: query}) + return result_program, res_name_query WELL_TYPED = [ From 6b19e08558f121d11bea118f02d73645d7a6d41c Mon Sep 17 00:00:00 2001 From: Efim Kubishkin Date: Wed, 5 Jun 2024 14:17:50 +0300 Subject: [PATCH 5/6] Rework QueryProgram and GrammarProgram --- tests/autotests/test_task12.py | 46 +++++++++++++------------- tests/autotests/to_program_parser.py | 49 ++++------------------------ 2 files changed, 29 insertions(+), 66 deletions(-) diff --git a/tests/autotests/test_task12.py b/tests/autotests/test_task12.py index 6ac2eb312..9b629f7c1 100644 --- a/tests/autotests/test_task12.py +++ b/tests/autotests/test_task12.py @@ -1,4 +1,5 @@ import random +from copy import deepcopy import pytest from to_program_parser import ( @@ -35,40 +36,37 @@ def test_ill_typed(self, program: str) -> None: @pytest.mark.parametrize("grammar", GRAMMARS_DIFFERENT) def test_exec_simple(self, graph: MultiDiGraph, grammar: CFG): start_nodes, final_nodes = generate_rnd_start_and_final(graph) - graph_prog = GraphProgram(graph) - cfg_prog = GrammarProgram(grammar) - query = QueryProgram(graph_prog, cfg_prog, start_nodes, final_nodes) + graph_prog = GraphProgram(deepcopy(graph)) + cfg_prog = GrammarProgram(deepcopy(grammar)) + query = QueryProgram( + graph_prog, cfg_prog, deepcopy(start_nodes), deepcopy(final_nodes) + ) program = query.full_program() - assert typing_program(program) - cfpq_from_prog = exec_program(program)[query.result_name] - cfpq_from_algo = cfpq_with_matrix(grammar, graph, start_nodes, final_nodes) + assert typing_program(deepcopy(program)) + cfpq_from_prog = exec_program(deepcopy(program))[query.result_name] + cfpq_from_algo = cfpq_with_matrix( + deepcopy(grammar), + deepcopy(graph), + deepcopy(start_nodes), + deepcopy(final_nodes), + ) assert cfpq_from_prog == cfpq_from_algo - # @pytest.mark.parametrize("queries_count", [2, 3, 5]) - # def test_exec_one_graph_many_queries(self, graph, queries_count): - # start_nodes, final_nodes = generate_rnd_start_and_final(graph) - # graph_prog = GraphProgram(graph) - # query_list = [] - # for i in range(queries_count): - # grammar_prog = GrammarProgram(random.choice(GRAMMARS_DIFFERENT)) - # query_list.append(QueryProgram(graph_prog, grammar_prog, start_nodes, final_nodes)) - # program, name_result = to_program_parser(query_list) - # result_dict: dict = exec_program(program) - # for var, res in result_dict.items(): - # query = name_result[var] - # separate_res = exec_program(query.full_program()) - # assert separate_res == res - # assert res == cfpq_with_matrix(query.get_grammar(), query.get_graph(), query.start_nodes, query.final_nodes) @pytest.mark.parametrize("queries_count", [1, 3, 5]) def test_exec_many_graphs_many_queries(self, queries_count): query_list = [] for i in range(queries_count): graph = generate_rnd_dense_graph(1, 40, LABELS) + start_nodes, final_nodes = generate_rnd_start_and_final(deepcopy(graph)) grammar_prog = GrammarProgram(random.choice(GRAMMARS_DIFFERENT)) - graph_prog = GraphProgram(graph) - start_nodes, final_nodes = generate_rnd_start_and_final(graph) + graph_prog = GraphProgram(deepcopy(graph)) query_list.append( - QueryProgram(graph_prog, grammar_prog, start_nodes, final_nodes) + QueryProgram( + graph_prog, + grammar_prog, + deepcopy(start_nodes), + deepcopy(final_nodes), + ) ) program, name_result = to_program_parser(query_list) result_dict: dict = exec_program(program) diff --git a/tests/autotests/to_program_parser.py b/tests/autotests/to_program_parser.py index ded9df86f..cf1ee5fd7 100644 --- a/tests/autotests/to_program_parser.py +++ b/tests/autotests/to_program_parser.py @@ -1,6 +1,4 @@ from copy import copy -import functools -import itertools import pyformlang as pl from pyformlang.cfg import Variable, Terminal @@ -25,19 +23,10 @@ def _terminal_to_string(terminal: Terminal) -> str: """ convert terminal symbol into char :param terminal: terminal symbol - :return: + :return: an object view of "x" """ terminal_s = terminal.to_text().lower() - if len(terminal_s) == 1: - return f'"{terminal_s}"' - res = "" - for key, group in itertools.groupby(terminal_s): - dot = " . " if res != "" else "" - if len(group) > 1: - res += f'{dot}"{key}"^[{len(group)}]' - else: - res += f'{dot}"{key}"' - return res + return f'"{terminal_s}"' class GraphProgram: @@ -58,7 +47,7 @@ def __str__(self): class GrammarProgram: nonterminal_names = {} - EPS = '"a"^[0]' + EPS = '"a"^[0 .. 0]' def __init__(self, cfg: pl.cfg.CFG): self.grammar = copy(cfg) @@ -82,18 +71,10 @@ def _object_to_string(self, cfg_object: Variable | Terminal) -> str: def _objs_to_expr(self, objects: list[pl.cfg.production.CFGObject]) -> str: if len(objects) == 0: return self.EPS - return functools.reduce( - lambda acc, obj: f"{acc}{' . ' if acc != '' else ''}{self._object_to_string(obj)}", - objects, - "", - ) + return " . ".join(map(self._object_to_string, objects)) def _objs_alts(self, objects: list[list[pl.cfg.production.CFGObject]]) -> str: - return functools.reduce( - lambda acc, objs: f"{acc}{' | ' if acc != '' else ''}{self._objs_to_expr(objs)}", - objects, - "", - ) + return " | ".join(map(self._objs_to_expr, objects)) def __str__(self) -> str: res = "" @@ -138,29 +119,13 @@ def query_program(self) -> str: :return: just select expression """ query_name = self.grammar_program.start_nonterminal_name - start_set_expr = ( - "[" - + functools.reduce( - lambda acc, x: f"{acc}{', ' if acc != '' else ''}{str(x)}", - self.start_nodes, - "", - ) - + "]" - ) + start_set_expr = f"[{', '.join(map(str, self.start_nodes))}]" if len(self.final_nodes) == 0: return ( f"\nlet {self.result_name} = for v in {start_set_expr} return u, v where u reachable from v in " f"{self.graph_program.name} by {query_name}" ) - final_set_expr = ( - "[" - + functools.reduce( - lambda acc, x: f"{acc}{', ' if acc != '' else ''}{str(x)}", - self.final_nodes, - "", - ) - + "]" - ) + final_set_expr = f"[{', '.join(map(str, self.final_nodes))}]" return ( f"\nlet {self.result_name} = for v in {start_set_expr} for u in {final_set_expr} return u, v where u " f"reachable from v in {self.graph_program.name} by {query_name}" From 005aca739984d9a275f1ef64fde1fac1bf2e3628 Mon Sep 17 00:00:00 2001 From: Efim Kubishkin Date: Mon, 17 Jun 2024 17:18:30 +0300 Subject: [PATCH 6/6] Fix graph name and add typing check --- tests/autotests/test_task12.py | 7 +++++-- tests/autotests/to_program_parser.py | 12 +++++++----- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/tests/autotests/test_task12.py b/tests/autotests/test_task12.py index 9b629f7c1..637f5f74d 100644 --- a/tests/autotests/test_task12.py +++ b/tests/autotests/test_task12.py @@ -69,10 +69,13 @@ def test_exec_many_graphs_many_queries(self, queries_count): ) ) program, name_result = to_program_parser(query_list) - result_dict: dict = exec_program(program) + assert typing_program(deepcopy(program)) + result_dict: dict = exec_program(deepcopy(program)) for var, res in result_dict.items(): query = name_result[var] - separate_res = exec_program(query.full_program()) + query_full_program = query.full_program() + assert typing_program(deepcopy(query_full_program)) + separate_res = exec_program(deepcopy(query_full_program)) assert separate_res == res assert res == cfpq_with_matrix( query.get_grammar(), diff --git a/tests/autotests/to_program_parser.py b/tests/autotests/to_program_parser.py index cf1ee5fd7..66214b3af 100644 --- a/tests/autotests/to_program_parser.py +++ b/tests/autotests/to_program_parser.py @@ -1,4 +1,4 @@ -from copy import copy +from copy import copy, deepcopy import pyformlang as pl from pyformlang.cfg import Variable, Terminal @@ -39,22 +39,24 @@ def __init__(self, graph: nx.MultiDiGraph): ) def __str__(self): - program = f"\nlet {self.graph.name} is graph" + program = f"\nlet {self.name} is graph" for node_from, node_to, data in self.graph.edges(data=True): - program += f'\nadd edge ({node_from}, "{data[LABEL]}", {node_to}) to {self.graph.name}' + program += ( + f'\nadd edge ({node_from}, "{data[LABEL]}", {node_to}) to {self.name}' + ) return program class GrammarProgram: - nonterminal_names = {} EPS = '"a"^[0 .. 0]' def __init__(self, cfg: pl.cfg.CFG): self.grammar = copy(cfg) + self.nonterminal_names = {} for production in cfg.productions: if production.head not in self.nonterminal_names.keys(): self.nonterminal_names[production.head] = FreshVar.generate_fresh( - _nonterminal_to_string(production.head) + _nonterminal_to_string(deepcopy(production.head)) ) self.start_nonterminal_name = self.nonterminal_names[cfg.start_symbol]