From ba00e43f759e62539d28df1c592fa245ecf9c47a Mon Sep 17 00:00:00 2001 From: Anna Petrasova Date: Tue, 7 May 2024 21:46:14 -0400 Subject: [PATCH] grass.script: Automatically parse JSON and CSV in parse_command (#3687) --- python/grass/script/core.py | 35 +++++++++++++------ .../test_start_command_functions_nc.py | 35 ++++++++++++++++++- 2 files changed, 59 insertions(+), 11 deletions(-) diff --git a/python/grass/script/core.py b/python/grass/script/core.py index 59d96577a4d..d027f941309 100644 --- a/python/grass/script/core.py +++ b/python/grass/script/core.py @@ -27,6 +27,9 @@ import string import random import shlex +import json +import csv +import io from tempfile import NamedTemporaryFile from pathlib import Path @@ -553,26 +556,35 @@ def read_command(*args, **kwargs): def parse_command(*args, **kwargs): """Passes all arguments to read_command, then parses the output - by parse_key_val(). + by default with parse_key_val(). - Parsing function can be optionally given by parse parameter - including its arguments, e.g. + If the command has parameter format and is called with + format=json, the output will be parsed into a dictionary. + Similarly, with format=csv the output will be parsed into + a list of lists (CSV rows). :: - parse_command(..., parse = (grass.parse_key_val, { 'sep' : ':' })) + parse_command("v.db.select", ..., format="json") - or you can simply define delimiter + Custom parsing function can be optionally given by parse parameter + including its arguments, e.g. :: - parse_command(..., delimiter = ':') + parse_command(..., parse=(gs.parse_key_val, {'sep': ':'})) + + Parameter delimiter is deprecated. :param args: list of unnamed arguments (see start_command() for details) :param kwargs: list of named arguments (see start_command() for details) :return: parsed module output """ + + def parse_csv(result): + return list(csv.DictReader(io.StringIO(result))) + parse = None parse_args = {} if "parse" in kwargs: @@ -580,13 +592,16 @@ def parse_command(*args, **kwargs): parse = kwargs["parse"][0] parse_args = kwargs["parse"][1] del kwargs["parse"] - - if "delimiter" in kwargs: - parse_args = {"sep": kwargs["delimiter"]} - del kwargs["delimiter"] + elif kwargs.get("format") == "json": + parse = json.loads + elif kwargs.get("format") == "csv": + parse = parse_csv if not parse: parse = parse_key_val # use default fn + if "delimiter" in kwargs: + parse_args = {"sep": kwargs["delimiter"]} + del kwargs["delimiter"] res = read_command(*args, **kwargs) diff --git a/python/grass/script/testsuite/test_start_command_functions_nc.py b/python/grass/script/testsuite/test_start_command_functions_nc.py index 22e2f4f5d21..4ad5d46f6e5 100644 --- a/python/grass/script/testsuite/test_start_command_functions_nc.py +++ b/python/grass/script/testsuite/test_start_command_functions_nc.py @@ -3,7 +3,8 @@ from grass.gunittest.case import TestCase from grass.gunittest.main import test -from grass.script.core import start_command, PIPE +from grass.script.core import parse_command, start_command, PIPE +from grass.script.utils import parse_key_val LOCATION = "nc" @@ -49,5 +50,37 @@ def test_multiple_underscores(self): self.assertIn(b"raster", stderr) +class TestParseCommand(TestCase): + """Tests parse_command""" + + def test_parse_default(self): + result = parse_command("r.info", map="elevation", flags="g") + self.assertTrue( + isinstance(result, dict) and isinstance(result.get("north"), str) + ) + result_2 = parse_command("r.info", map="elevation", flags="g", delimiter="=") + self.assertDictEqual(result, result_2) + result_3 = parse_command( + "r.info", map="elevation", flags="g", parse=(parse_key_val, {"sep": "="}) + ) + self.assertDictEqual(result, result_3) + + def test_parse_format_json(self): + result = parse_command( + "r.what", map="elevation", coordinates=(640000, 220000), format="json" + ) + self.assertTrue( + isinstance(result, list) + and isinstance(result[0].get("easting"), (int, float)) + ) + + def test_parse_format_csv(self): + reference = parse_command("v.db.select", map="zipcodes", format="json")[ + "records" + ] + result = parse_command("v.db.select", map="zipcodes", format="csv") + self.assertListEqual(list(reference[0].keys()), list(result[0].keys())) + + if __name__ == "__main__": test()