Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Proposal: Delta Debugging Based on Parse Trees #23

Closed
rindPHI opened this issue Mar 11, 2021 · 2 comments
Closed

Feature Proposal: Delta Debugging Based on Parse Trees #23

rindPHI opened this issue Mar 11, 2021 · 2 comments

Comments

@rindPHI
Copy link

rindPHI commented Mar 11, 2021

For my work on learning symbolic inputs, I wrote a variant of delta debugging which reduces parse trees based on an existing grammar. It works quite well and reasonably fast so far. I do not know whether it is relevant for the Debugging Book, and the code is quite lengthy and not didactically optimized, but thought I'd share it here anyway.

After the below code implementing the reduction, I'll also post an example processing CSV.

import itertools
import logging
import re

from fuzzingbook.GrammarFuzzer import tree_to_string
from fuzzingbook.Grammars import RE_NONTERMINAL, is_nonterminal
from fuzzingbook.Parser import EarleyParser


class Reducer:
    def __init__(self, grammar,
                 concrete_input,
                 test,
                 expected_exc_type=Exception,
                 max_tries_for_generalization=50,
                 max_tries_for_validation=200,
                 max_retries_after_failed_validation=3):
        self.grammar = grammar
        self.test = test
        self.max_tries_for_generalization = max_tries_for_generalization
        self.max_tries_for_validation = max_tries_for_validation
        self.max_retries_after_failed_validation = max_retries_after_failed_validation
        self.expected_exc_type = expected_exc_type
        self.logger = logging.getLogger("Reducer")
        self.concrete_input = concrete_input

        if max_tries_for_validation < max_tries_for_generalization:
            self.max_tries_for_validation = 2 * max_tries_for_generalization

    def reduce(self, concrete_input=None):
        """Implements the idea of Delta Debugging for input trees."""
        self.logger.info("PHASE START: Reduction à la Delta Debugging.")

        if concrete_input is None:
            concrete_input = self.concrete_input

        parse_tree = list(EarleyParser(self.grammar).parse(concrete_input))[0]

        did_generalize = True
        while did_generalize:
            paths = [[0]]
            did_generalize = False

            while paths:
                path = paths.pop()

                node, children = Reducer.get_subtree(parse_tree, path)
                tree_for_log = tree_to_string((node, children))

                compatible_subtree_paths = []
                for i, child in enumerate(children):
                    compatible_paths = self.find_paths(child, lambda p, t: t[0] == node)
                    if compatible_paths:
                        compatible_subtree_paths.extend(list(map(lambda p: path + [i] + p, compatible_paths)))

                did_replace = False

                # Step 1: Try to replace this subtree by one of its subtree
                for compatible_path in compatible_subtree_paths:
                    subtree = Reducer.get_subtree(parse_tree, compatible_path)
                    maybe_new_input = Reducer.replace_subtree(parse_tree, subtree, path)
                    if self.test_input(tree_to_string(maybe_new_input)):
                        self.logger.info(f"Generalizing path {path}, "
                                         f"replacing {tree_to_string((node, children))} "
                                         f"with {tree_to_string(subtree)}")
                        parse_tree = maybe_new_input
                        self.logger.info(f"Current input: {tree_to_string(parse_tree)}")
                        did_replace, did_generalize = True, True
                        break

                # Step 2: Try to choose another expansion in the grammar to replace the
                # children set by just one child or fewer children.
                if not did_replace:
                    expansions = [list(filter(lambda l: l, re.split(RE_NONTERMINAL, e)))
                                  for e in self.grammar.get(node, [])]

                    current_expansion = [node for node in [c[0] for c in children]]

                    for elements in [e for e in expansions if e != current_expansion]:
                        # Find matching subtrees for each nonterminal
                        nonterminals = [e for e in elements if is_nonterminal(e)]
                        compatible_subtree_paths = [None] * len(nonterminals)
                        for k, nonterminal in enumerate(nonterminals):
                            compatible_subtree_paths[k] = []
                            for i, child in enumerate(children):
                                compatible_paths = self.find_paths(child, lambda p, t: t[0] == nonterminal)
                                if compatible_paths:
                                    compatible_subtree_paths[k].extend(
                                        list(map(lambda p: path + [i] + p, compatible_paths)))

                        # What itertools.product does:
                        # [[1, 2], [3, 4, 5]] -> [[1, 3], [1, 4], [1, 5], [2, 3], ...]

                        # Try all instantiations
                        for combination in itertools.product(*compatible_subtree_paths):
                            i = 0
                            new_children = []
                            for element in elements:
                                if not is_nonterminal(element):
                                    if type(element) is not tuple:
                                        element = (element, [])
                                    new_children.append(element)
                                else:
                                    new_children.append(Reducer.get_subtree(parse_tree, combination[i]))
                                    i += 1

                            subtree = (node, new_children)

                            if Reducer.count_nodes((node, children)) <= Reducer.count_nodes((node, new_children)):
                                continue

                            maybe_new_input = Reducer.replace_subtree(parse_tree, subtree, path)
                            if self.test_input(tree_to_string(maybe_new_input)):
                                self.logger.info(f"Generalizing path {path}, "
                                                 f"replacing {tree_to_string((node, children))} "
                                                 f"with {tree_to_string(subtree)}")
                                parse_tree = maybe_new_input
                                self.logger.info(f"Current input: {str(parse_tree)}")
                                did_replace, did_generalize = True, True
                                break

                if not did_replace:
                    self.logger.info(f"Cannot generalize path {path}, expression {tree_for_log}")
                    if children is not None:
                        for i, child in enumerate(children):
                            child_symbol, _ = child
                            if child_symbol in self.grammar:
                                paths.append(path + [i])

        self.logger.info("PHASE END: Reduction à la Delta Debugging.")

        self.concrete_input = tree_to_string(parse_tree)

        return self.concrete_input

    def test_input(self, concrete_input):
        self.logger.debug(f"Testing {repr(concrete_input)}...")

        try:
            self.test(concrete_input)
        except Exception as exc:
            if issubclass(type(exc), self.expected_exc_type):
                self.logger.debug(f"FAIL ({type(exc).__name__})")
                return True
            else:
                self.logger.info(f"UNEXPECTED FAIL ({type(exc).__name__})")
                return False
        else:
            self.logger.debug(f"PASS")
            return False

    def find_paths(self, tree, predicate, path=None):
        """
        Return a list of all paths for which `predicate` holds.
        `predicate` is a function `predicate`(`path`, `tree`), where
        `path` denotes a subtree in `tree`. If `predicate()` returns
        True, `path` is included in the returned list.

        Taken from the Debugging Book.
        """

        if path is None:
            path = []

        symbol, children = Reducer.get_subtree(tree, path)

        if predicate(path, (symbol, children)):
            return [path]

        paths = []
        if children is not None:
            for i, child in enumerate(children):
                child_symbol, _ = child
                if child_symbol in self.grammar:
                    paths += self.find_paths(tree, predicate, path + [i])

        return paths

    @staticmethod
    def get_subtree(tree, path):
        """Access a subtree based on `path` (a list of children numbers)"""
        node, children = tree
        if not path:
            return tree

        return Reducer.get_subtree(children[path[0]], path[1:])

    @staticmethod
    def replace_subtree(tree, replacement_tree, path):
        """Returns a symbolic input with a new tree where replacement_tree has been inserted at `path`"""

        def recurse(tree, path):
            node, children = tree

            if not path:
                return replacement_tree

            head = path[0]
            new_children = (children[:head] +
                            [recurse(children[head], path[1:])] +
                            children[head + 1:])

            return node, new_children

        return recurse(tree, path)

    @staticmethod
    def dfs(tree, action=print):
        node, children = tree
        action(tree)
        for child in children:
            Reducer.dfs(child, action)

    @staticmethod
    def count_nodes(parse_tree):
        num = 0

        def count(t):
            nonlocal num
            num += 1

        Reducer.dfs(parse_tree, count)
        return num

The method read_csv_file reads a CSV file into a custom data structure. If one line has more fields than the header, an IndexError is thrown.

CSV_EBNF_GRAMMAR = {
    "<start>": ["<csv-file>"],
    "<csv-file>": ["<csv-record>*"],
    "<csv-record>": ["<csv-string-list>\n"],
    "<csv-string-list>": ["<raw-string>", "<raw-string>;<csv-string-list>"],
    "<raw-string>": ["<spaces>", "<spaces>?<raw-field><spaces>?"],
    "<raw-field>": ["<simple-field>", "<quoted-field>"],
    "<simple-field>": ["<simple-character>*"],
    "<simple-character>": [c for c in srange(string.printable) if c not in ["\n", ";", '"', " ", "\t", "\r"]],
    "<quoted-field>": ['"<escaped-field>"'],
    "<escaped-field>": ["<escaped-character>*"],
    "<escaped-character>": [c for c in srange(string.printable) if c not in ['"']],
    "<spaces>": [" ", " <spaces>"],
}

CSV_GRAMMAR = convert_ebnf_grammar(CSV_EBNF_GRAMMAR)

CSV_PARSER = EarleyParser(CSV_GRAMMAR)


class CSVFile:
    def __init__(self, header: list, lines: list[list]):
        self.header = header
        self.lines = []
        for line in lines:
            the_dict = {}
            for i, entry in enumerate(line):
                the_dict[header[i]] = entry
            self.lines.append(the_dict)

    def __getitem__(self, item):
        if type(item) is str:
            return [line[item] for line in self.lines]
        else:
            return self.lines[item]

    def __iter__(self):
        return self.lines.__iter__()

    def __str__(self):
        result = " ; ".join(self.header)
        if self.lines:
            result += "\n"
        result += "\n".join([" ; ".join(line.values()) for line in self.lines])
        return result


def read_csv_file(inp):
    tree = list(CSV_PARSER.parse(inp))[0]

    fields = []
    records = []

    def collect_records(t):
        nonlocal fields, records
        node, children = t
        if node == "<csv-record>":
            record = []
            for child in children:
                fields = []
                Reducer.dfs(child, collect_fields)
                if fields:
                    record.append(fields)
            records.extend(record)

    def collect_fields(t):
        nonlocal fields
        node, children = t
        if node == "<csv-string-list>":
            fields.append(tree_to_string(children[0]))

    Reducer.dfs(tree, collect_records)

    if len(records) < 1:
        raise SyntaxError("CSV file must contain at least a header line!")

    return CSVFile(records[0], records[1:])

It can be used as shown below:

buggy_input = """"Field 1";"Field 2";"Field 3"
17;23;42
5648;13459;"some
multiline
text"
1;2;"now:";4
100;200;300
"""

reducer = Reducer(
    CSV_GRAMMAR, buggy_input, read_csv_file, expected_exc_type=IndexError,
    max_tries_for_generalization=100)

result = reducer.reduce(buggy_input)

self.assertEqual("""""
"";
""", result)
@andreas-zeller
Copy link
Member

Actually, we already have a "syntactic" DD variant – just not in the debuggingbook, but in the fuzzingbook. Have a look here:

https://www.fuzzingbook.org/html/Reducer.html#Grammar-Based-Input-Reduction

In the debuggingbook DD chapter, I have now set up a pointer to this section.

@rindPHI
Copy link
Author

rindPHI commented Mar 11, 2021

Thanks very much for this super quick reply! I did not know about GrammarReducer, could have saved me some work. Unfortunately, I don't manage to get it to work as expected for my CSV example, the output I get is unreduced and contains nonterminal symbols on top. I just filed a bug report for the fuzzing book: uds-se/fuzzingbook#96

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants