From ba4d7d64b6ece2632b217613ab56776a11ab441f Mon Sep 17 00:00:00 2001 From: Xiaokui Shu Date: Thu, 19 Oct 2023 08:59:44 -0400 Subject: [PATCH] fix: case-insensitive bug for items in brackets for elastic_ecs --- .../stix_translation/query_constructor.py | 58 ++++++++++++++----- .../test_elastic_ecs_stix_to_query.py | 38 +++++++----- 2 files changed, 67 insertions(+), 29 deletions(-) diff --git a/stix_shifter_modules/elastic_ecs/stix_translation/query_constructor.py b/stix_shifter_modules/elastic_ecs/stix_translation/query_constructor.py index 658b7d2f2..d3dbdc3e9 100644 --- a/stix_shifter_modules/elastic_ecs/stix_translation/query_constructor.py +++ b/stix_shifter_modules/elastic_ecs/stix_translation/query_constructor.py @@ -335,15 +335,15 @@ def _unfold_case_insensitive_regex(regex_pattern): # xsb: xs inside bracket xsb = xs[1::2] - xsb[ci_index//2:] = ["[" + _unfold_ci_chars(x, True) + "]" for x in xsb[ci_index//2:]] + xsb[ci_index//2:] = [_unfold_ci_chars_in_bracket(x) for x in xsb[ci_index//2:]] xs[1::2] = xsb # xsob: xs outside bracket xsob = xs[0::2] xsob_s_h, *xsob_s_t = xsob[ci_index//2].split("(?i)") - xsob_s_t = [_unfold_ci_chars(x, False) for x in xsob_s_t] + xsob_s_t = [_unfold_plaintext_ci_chars(x) for x in xsob_s_t] xsob[ci_index//2] = "".join([xsob_s_h] + xsob_s_t) - xsob[ci_index//2+1:] = [_unfold_ci_chars(x.replace("(?i)", ""), False) for x in xsob[ci_index//2+1:]] + xsob[ci_index//2+1:] = [_unfold_plaintext_ci_chars(x.replace("(?i)", "")) for x in xsob[ci_index//2+1:]] xs[0::2] = xsob p_unfolded = "".join(xs) @@ -361,19 +361,47 @@ def _unfold_case_insensitive_regex(regex_pattern): return regex_pattern -def _unfold_ci_chars(regex_pattern_segment, if_set): - # if_set: if all chars are in the square bracket of regex - def char_mapper(c): - if if_set: - return c.lower() + c.upper() +def _unfold_plaintext_ci_chars(regex_pattern_segment): + return "".join([f"[{x.lower()}{x.upper()}]" if x.isascii() and x.isalpha() else x for x in regex_pattern_segment]) + + +def _unfold_ci_chars_in_bracket(regex_pattern_in_bracket): + # split segments + segs = [""] + # effective i that knows skipped indexes/chars + ie = 0 + for i, x in enumerate(regex_pattern_in_bracket): + if i < ie: + continue + if i < len(regex_pattern_in_bracket)-2: + if x.isascii(): + ahead1 = regex_pattern_in_bracket[i+1] + ahead2 = regex_pattern_in_bracket[i+2] + if ahead1 == "-" and ahead2.isascii(): + segs.append(regex_pattern_in_bracket[i:i+3]) + segs.append("") + ie = i+3 + else: + segs[-1] = segs[-1] + x + else: + segs[-1] = segs[-1] + x + else: + segs.append(regex_pattern_in_bracket[i:len(regex_pattern_in_bracket)]) + break + segs_new = [] + for seg in segs: + if len(seg) == 3 and seg[1] == "-" and seg[0].isascii() and seg[0].isalpha() and seg[2].isascii() and seg[2].isalpha(): + lower = f"{seg[0].lower()}-{seg[2].lower()}" + if lower not in segs_new: + segs_new.append(lower) + upper = f"{seg[0].upper()}-{seg[2].upper()}" + if upper not in segs_new: + segs_new.append(upper) else: - return f"[{c.lower()}{c.upper()}]" - xs = list(regex_pattern_segment) - s = "".join([char_mapper(x) if x.isascii() and x.isalpha() else x for x in xs]) - if if_set: - # dedup for items inside square bracket - s = "".join(sorted(set(s))) - return s + new = "".join([x.lower()+x.upper() if x.isascii() and x.isalpha() else x for x in seg]) + if new not in segs_new: + segs_new.append(new) + return "[" + "".join(segs_new) + "]" def translate_pattern(pattern: Pattern, data_model_mapping, options): diff --git a/stix_shifter_modules/elastic_ecs/tests/stix_translation/test_elastic_ecs_stix_to_query.py b/stix_shifter_modules/elastic_ecs/tests/stix_translation/test_elastic_ecs_stix_to_query.py index 9002631df..33b8aeb4e 100644 --- a/stix_shifter_modules/elastic_ecs/tests/stix_translation/test_elastic_ecs_stix_to_query.py +++ b/stix_shifter_modules/elastic_ecs/tests/stix_translation/test_elastic_ecs_stix_to_query.py @@ -1,7 +1,8 @@ from stix_shifter.stix_translation import stix_translation from stix_shifter_utils.utils.error_response import ErrorCode from stix_shifter_modules.elastic_ecs.stix_translation.query_constructor import ( - _unfold_ci_chars, + _unfold_plaintext_ci_chars, + _unfold_ci_chars_in_bracket, _unfold_case_insensitive_regex, ) import unittest @@ -41,26 +42,35 @@ def _remove_timestamp_from_query(queries): class TestStixtoQuery(unittest.TestCase, object): def test_case_insensitive_unfold_chars(self): - input_output_pairs = [ ("a", False, "[aA]") - , ("ab", False, "[aA][bB]") - , ("ab7#*c((D))", False, "[aA][bB]7#*[cC](([dD]))") - , ("aba", True, "ABab") - , ("ab7#BBeE", True, "#7ABEabe") + input_output_pairs = [ ("a", "[aA]") + , ("ab", "[aA][bB]") + , ("ab7#*c((D))", "[aA][bB]7#*[cC](([dD]))") ] - for (x,y,z) in input_output_pairs: - assert z == _unfold_ci_chars(x, y) - + for (x,y) in input_output_pairs: + assert y == _unfold_plaintext_ci_chars(x) + + def test_unfold_ci_chars_in_bracket(self): + iopairs = [ ("abD", "[aAbBdD]") + , ("a-z0-9", "[a-zA-Z0-9]") + , ("-ef-z", "[-eEf-zF-Z]") + , ("ab-", "[aAbB-]") + , ("a-zA-Z0-9", "[a-zA-Z0-9]") + ] + for (x,y) in iopairs: + assert y == _unfold_ci_chars_in_bracket(x) def test_case_insensitive_unfold_regex(self): iopairs = [ ("http://z[abc]83m li", "http://z[abc]83m li") , ("(?i)virus", "[vV][iI][rR][uU][sS]") - , ("(?i)virus[ s]", "[vV][iI][rR][uU][sS][ Ss]") - , ("(?i)virus[ s] bin [c3b]", "[vV][iI][rR][uU][sS][ Ss] [bB][iI][nN] [3BCbc]") + , ("(?i)virus[ s]", "[vV][iI][rR][uU][sS][ sS]") + , ("(?i)virus[ s] bin [c3b]", "[vV][iI][rR][uU][sS][ sS] [bB][iI][nN] [cC3bB]") , (r"(?i)virus\[ s\]", r"[vV][iI][rR][uU][sS]\[ [sS]\]") - , (r"(?i)virus\\[ s\\]", r"[vV][iI][rR][uU][sS]\\[ Ss\\]") + , (r"(?i)virus\\[ s\\]", r"[vV][iI][rR][uU][sS]\\[ sS\\]") , ("(?i)http://z83m li", "[hH][tT][tT][pP]://[zZ]83[mM] [lL][iI]") - , ("(?i)http://z[abc]83m li", "[hH][tT][tT][pP]://[zZ][ABCabc]83[mM] [lL][iI]") - , ("http://(?i)z[abc]83m li", "http://[zZ][ABCabc]83[mM] [lL][iI]") + , ("(?i)http://z[abc]83m li", "[hH][tT][tT][pP]://[zZ][aAbBcC]83[mM] [lL][iI]") + , ("http://(?i)z[abc]83m li", "http://[zZ][aAbBcC]83[mM] [lL][iI]") + , ("http://(?i)z[a-z]83m li", "http://[zZ][a-zA-Z]83[mM] [lL][iI]") + , ("http://(?i)z[a-z0-9A-Z]83m li", "http://[zZ][a-zA-Z0-9]83[mM] [lL][iI]") ] for (x,y) in iopairs: assert y == _unfold_case_insensitive_regex(x)