diff --git a/index_configuration_tool/Dockerfile b/index_configuration_tool/Dockerfile index e73e4c246..03abf8581 100644 --- a/index_configuration_tool/Dockerfile +++ b/index_configuration_tool/Dockerfile @@ -10,9 +10,8 @@ FROM python:3.11-slim WORKDIR /code # Copy only required dependencies COPY --from=builder /root/.local /root/.local -# Copy only source code and Logstash grammar +# Copy only source code COPY ./*.py . -COPY logstash.lark . # update PATH ENV PATH=/root/.local:$PATH diff --git a/index_configuration_tool/logstash.lark b/index_configuration_tool/logstash.lark deleted file mode 100644 index ea8096a0e..000000000 --- a/index_configuration_tool/logstash.lark +++ /dev/null @@ -1,39 +0,0 @@ -// Adapted from https://github.com/bpaquet/node-logstash/blob/master/lib/logstash_config.jison -// Tested via https://www.lark-parser.org/ide -// TODO add if/else support -?start: config_section (config_section)* - -?config_section: type "{" plugins? "}" - -plugins: plugin (plugin)* - -plugin: key "{" plugin_params? "}" - -plugin_params: param (param)* - -param: key "=>" value - -?value: string_literal - | number - | "true" -> true - | "false" -> false - | list - -?list : "[" [member ("," member)*] "]" -?member: string_literal | number -type: PLUGIN_TYPE -key: STRING -string_literal: ESCAPED_STRING -number: SIGNED_INT -PLUGIN_TYPE: "input" | "filter" | "output" -STRING: (DIGIT|LETTER|"_"|"-"|".")+ -COMMENT: "#"/.*/ - -%import common.ESCAPED_STRING -%import common.SIGNED_INT -%import common.DIGIT -%import common.LETTER -%import common.WS - -%ignore WS -%ignore COMMENT \ No newline at end of file diff --git a/index_configuration_tool/logstash_conf_parser.py b/index_configuration_tool/logstash_conf_parser.py deleted file mode 100644 index 827453c69..000000000 --- a/index_configuration_tool/logstash_conf_parser.py +++ /dev/null @@ -1,52 +0,0 @@ -import sys - -from lark import Lark -from lark import Transformer - - -# The names of each function in the Transformer corresponds -# to -class LogstashTransformer(Transformer): - def var_name(self, v: list) -> str: - (v,) = v - return v.value - - def string_literal(self, s: list) -> str: - s = self.var_name(s) - # Remove surrounding quotes - return s[1:-1] - - def number(self, n: list) -> int: - (n,) = n - return int(n) - - def true(self, b) -> bool: - return True - - def false(self, b) -> bool: - return False - - # The same logic is applied for both rules - key = var_name - type = var_name - # These rules can be transformed directly to a corresponding Python type - start = dict - config_section = tuple - plugin_params = dict - list = list - param = tuple - plugin = tuple - plugins = list - - -logstash_parser = Lark.open("logstash.lark", rel_to=__file__, parser="lalr", transformer=LogstashTransformer()) - - -def parse(logstash_file: str) -> dict: - with open(logstash_file, "r") as conf_file: - return logstash_parser.parse(conf_file.read()) - - -if __name__ == '__main__': # pragma no cover - val = parse(sys.argv[1]) - print(val) diff --git a/index_configuration_tool/main.py b/index_configuration_tool/main.py index 938c3ba5c..944354068 100644 --- a/index_configuration_tool/main.py +++ b/index_configuration_tool/main.py @@ -1,14 +1,16 @@ import argparse +import yaml from typing import Optional import index_operations -import logstash_conf_parser as logstash_parser import utils # Constants SUPPORTED_ENDPOINTS = ["opensearch", "elasticsearch"] +SOURCE_KEY = "source" +SINK_KEY = "sink" HOSTS_KEY = "hosts" -USER_KEY = "user" +USER_KEY = "username" PWD_KEY = "password" @@ -19,10 +21,9 @@ def get_auth(input_data: dict) -> Optional[tuple]: def get_endpoint_info(plugin_config: dict) -> tuple: - endpoint = "https://" if ("ssl" in plugin_config and plugin_config["ssl"]) else "http://" # "hosts" can be a simple string, or an array of hosts for Logstash to hit. # This tool needs one accessible host, so we pick the first entry in the latter case. - endpoint += plugin_config[HOSTS_KEY][0] if type(plugin_config[HOSTS_KEY]) is list else plugin_config[HOSTS_KEY] + endpoint = plugin_config[HOSTS_KEY][0] if type(plugin_config[HOSTS_KEY]) is list else plugin_config[HOSTS_KEY] endpoint += "/" return endpoint, get_auth(plugin_config) @@ -32,13 +33,28 @@ def fetch_all_indices_by_plugin(plugin_config: dict) -> dict: return index_operations.fetch_all_indices(endpoint, auth_tuple) +def check_supported_endpoint(config: dict) -> Optional[tuple]: + for supported_type in SUPPORTED_ENDPOINTS: + if supported_type in config: + return supported_type, config[supported_type] + + def get_supported_endpoint(config: dict, key: str) -> tuple: - # The value of each key is a list of plugin configs. - # Each config is a tuple, where the first value is the endpoint type. - supported_endpoint = next((p for p in config[key] if p[0] in SUPPORTED_ENDPOINTS), None) - if not supported_endpoint: + # The value of each key may be a single plugin (as a dict) + # or a list of plugin configs + supported_tuple = tuple() + if type(config[key]) is dict: + supported_tuple = check_supported_endpoint(config[key]) + elif type(config[key]) is list: + for entry in config[key]: + supported_tuple = check_supported_endpoint(entry) + # Break out of the loop at the first supported type + if supported_tuple: + break + if not supported_tuple: raise ValueError("Could not find any supported endpoints in section: " + key) - return supported_endpoint + # First tuple value is the name, second value is the config dict + return supported_tuple[0], supported_tuple[1] def validate_plugin_config(config: dict, key: str): @@ -53,11 +69,11 @@ def validate_plugin_config(config: dict, key: str): raise ValueError("Invalid auth configuration (Password without user) for endpoint: " + supported_endpoint[0]) -def validate_logstash_config(config: dict): - if "input" not in config or "output" not in config: - raise ValueError("Missing input or output data from Logstash config") - validate_plugin_config(config, "input") - validate_plugin_config(config, "output") +def validate_pipeline_config(config: dict): + if SOURCE_KEY not in config or SINK_KEY not in config: + raise ValueError("Missing source or sink configuration in Data Prepper pipeline YAML") + validate_plugin_config(config, SOURCE_KEY) + validate_plugin_config(config, SINK_KEY) # Computes differences in indices between source and target. @@ -92,15 +108,18 @@ def print_report(index_differences: tuple[set, set, set]): # pragma no cover def run(config_file_path: str) -> None: - # Parse and validate logstash config file - logstash_config = logstash_parser.parse(config_file_path) - validate_logstash_config(logstash_config) + # Parse and validate pipelines YAML file + with open(config_file_path, 'r') as pipeline_file: + dp_config = yaml.safe_load(pipeline_file) + # We expect the Data Prepper pipeline to only have a single top-level value + pipeline_config = next(iter(dp_config.values())) + validate_pipeline_config(pipeline_config) # Endpoint is a tuple of (type, config) - endpoint = get_supported_endpoint(logstash_config, "input") + endpoint = get_supported_endpoint(pipeline_config, SOURCE_KEY) # Fetch all indices from source cluster source_indices = fetch_all_indices_by_plugin(endpoint[1]) # Fetch all indices from target cluster - endpoint = get_supported_endpoint(logstash_config, "output") + endpoint = get_supported_endpoint(pipeline_config, SINK_KEY) target_endpoint, target_auth = get_endpoint_info(endpoint[1]) target_indices = index_operations.fetch_all_indices(target_endpoint, target_auth) # Compute index differences and print report @@ -119,8 +138,8 @@ def run(config_file_path: str) -> None: arg_parser = argparse.ArgumentParser( prog="python main.py", description="This tool creates indices on a target cluster based on the contents of a source cluster.\n" + - "The source and target endpoints are obtained by parsing a Logstash config file, which is the " + - "sole expected argument for this module.\nAlso prints a report of the indices to be created, " + + "The source and target endpoints are obtained by parsing a Data Prepper pipelines YAML file, which " + + "is the sole expected argument for this module.\nAlso prints a report of the indices to be created, " + "along with indices that are identical or have conflicting settings/mappings.\nIn case of the " + "latter, no action will be taken on the target cluster.", formatter_class=argparse.RawTextHelpFormatter @@ -128,7 +147,7 @@ def run(config_file_path: str) -> None: # This tool only takes one argument arg_parser.add_argument( "config_file_path", - help="Path to the Logstash config file to parse for source and target endpoint information" + help="Path to the Data Prepper pipeline YAML file to parse for source and target endpoint information" ) args = arg_parser.parse_args() print("\n##### Starting index configuration tool... #####\n") diff --git a/index_configuration_tool/requirements.txt b/index_configuration_tool/requirements.txt index 51e8144a1..1a7d17eef 100644 --- a/index_configuration_tool/requirements.txt +++ b/index_configuration_tool/requirements.txt @@ -1,4 +1,4 @@ jsondiff>=2.0.0 -lark>=1.1.5 +pyyaml>=6.0 requests>=2.28.2 responses>=0.23.1 \ No newline at end of file diff --git a/index_configuration_tool/tests/resources/expected_parse_output.pickle b/index_configuration_tool/tests/resources/expected_parse_output.pickle index 2bb22976b..568cdae3f 100644 Binary files a/index_configuration_tool/tests/resources/expected_parse_output.pickle and b/index_configuration_tool/tests/resources/expected_parse_output.pickle differ diff --git a/index_configuration_tool/tests/resources/logstash_test_input.conf b/index_configuration_tool/tests/resources/logstash_test_input.conf deleted file mode 100644 index 24caef779..000000000 --- a/index_configuration_tool/tests/resources/logstash_test_input.conf +++ /dev/null @@ -1,40 +0,0 @@ -input { - - plugin1 { - string_key => "string value" - bool_key => true - num_key => 1 - } - - plugin2 { - neg_key => -1 - } - - elasticsearch { - hosts => ["host1", "host2"] - user => "test_user" - password => "password" - } -} - -filter { - plugin3 { - str_array => ["abc", "x y z"] - } -} - -output { - plugin4 {} - - plugin5 { - num_array => [0] - } - - opensearch { - hosts => "os_host" - ssl => "true" - user => "test_user" - password => "test" - } -} - diff --git a/index_configuration_tool/tests/resources/test_pipeline_input.yaml b/index_configuration_tool/tests/resources/test_pipeline_input.yaml new file mode 100644 index 000000000..7c33a7e21 --- /dev/null +++ b/index_configuration_tool/tests/resources/test_pipeline_input.yaml @@ -0,0 +1,21 @@ +test-pipeline-input: + source: + elasticsearch: + hosts: ["http://host1", "http://host2"] + username: "test_user" + password: "password" + processor: + - plugin1: + str_array: ["abc", "x y z"] + obj_array: + - key: "key1" + value: "val1" + - key: "key2" + value: "val 2" + sink: + - sink1: + num_array: [0] + - opensearch: + hosts: ["https://os_host"] + username: "test_user" + password: "test" diff --git a/index_configuration_tool/tests/test_constants.py b/index_configuration_tool/tests/test_constants.py index a6325423a..c4cd4d51f 100644 --- a/index_configuration_tool/tests/test_constants.py +++ b/index_configuration_tool/tests/test_constants.py @@ -2,8 +2,8 @@ from os.path import dirname TEST_RESOURCES_SUBPATH = "/resources/" -LOGSTASH_RAW_FILE_PATH = dirname(__file__) + TEST_RESOURCES_SUBPATH + "logstash_test_input.conf" -LOGSTASH_PICKLE_FILE_PATH = dirname(__file__) + TEST_RESOURCES_SUBPATH + "expected_parse_output.pickle" +PIPELINE_CONFIG_RAW_FILE_PATH = dirname(__file__) + TEST_RESOURCES_SUBPATH + "test_pipeline_input.yaml" +PIPELINE_CONFIG_PICKLE_FILE_PATH = dirname(__file__) + TEST_RESOURCES_SUBPATH + "expected_parse_output.pickle" INDEX1_NAME = "index1" INDEX2_NAME = "index2" @@ -51,6 +51,15 @@ } } } -# Based on the contents of logstash_test_input.conf +# Based on the contents of test_pipeline_input.yaml SOURCE_ENDPOINT = "http://host1/" TARGET_ENDPOINT = "https://os_host/" + +# Utility logic to update the pickle file if/when the input file is updated +# import yaml +# import pickle +# if __name__ == '__main__': +# with open(PIPELINE_CONFIG_RAW_FILE_PATH, 'r') as test_input: +# test_config = yaml.safe_load(test_input) +# with open(PIPELINE_CONFIG_PICKLE_FILE_PATH, 'wb') as out: +# pickle.dump(test_config, out) diff --git a/index_configuration_tool/tests/test_logstash_parser.py b/index_configuration_tool/tests/test_logstash_parser.py deleted file mode 100644 index 073f0c5c8..000000000 --- a/index_configuration_tool/tests/test_logstash_parser.py +++ /dev/null @@ -1,73 +0,0 @@ -import pickle -import unittest - -import lark.exceptions -from jsondiff import diff - -from logstash_conf_parser import logstash_parser, parse -from tests import test_constants - - -class TestLogstashParser(unittest.TestCase): - # Run before each test - def setUp(self) -> None: - with open(test_constants.LOGSTASH_PICKLE_FILE_PATH, "rb") as f: - # The root DS is a dict, with input type as key. - # The value of each key is an array of inputs. - # Each input is a tuple of plugin name and data, - # where the data is a dict of key-value pairs. - self.test_data = pickle.load(f) - - # Test input json should match loaded pickle data - def test_parser_happy_case(self): - actual = parse(test_constants.LOGSTASH_RAW_FILE_PATH) - test_diff = diff(self.test_data, actual) - # Validate that diff is empty - self.assertEqual(test_diff, dict()) - - def test_bad_configs(self): - # Checks for: - # - Empty config - # - Section should begin with type name - # - Invalid type - # - Valid type but no params - bad_configs = ["", "{}", "bad {}", "input"] - for config in bad_configs: - self.assertRaises(lark.exceptions.UnexpectedToken, logstash_parser.parse, config) - - # Note that while these are considered valid Logstash configurations, - # main.py considers them incomplete and would fail when validating them. - def test_empty_config_can_be_parsed(self): - logstash_parser.parse("input {}") - logstash_parser.parse("filter {}") - logstash_parser.parse("output {}") - - def test_string(self): - val = self.test_data["input"][0][1]["string_key"] - self.assertEqual(str, type(val)) - self.assertTrue(len(val) > 0) - - def test_bool(self): - val = self.test_data["input"][0][1]["bool_key"] - self.assertEqual(bool, type(val)) - self.assertTrue(val) - - def test_num(self): - num = self.test_data["input"][0][1]["num_key"] - neg_num = self.test_data["input"][1][1]["neg_key"] - self.assertEqual(int, type(num)) - self.assertEqual(1, num) - self.assertEqual(int, type(neg_num)) - self.assertEqual(-1, neg_num) - - -# Utility method to update the expected output pickle -# file if/when the input conf file is changed. -def __update_output_pickle(): - with open(test_constants.LOGSTASH_PICKLE_FILE_PATH, "wb") as out: - val = parse(test_constants.LOGSTASH_RAW_FILE_PATH) - pickle.dump(val, out) - - -if __name__ == '__main__': - unittest.main() diff --git a/index_configuration_tool/tests/test_main.py b/index_configuration_tool/tests/test_main.py index e4374615e..cde419328 100644 --- a/index_configuration_tool/tests/test_main.py +++ b/index_configuration_tool/tests/test_main.py @@ -11,24 +11,18 @@ # Constants TEST_KEY = "test_key" BASE_CONFIG_SECTION = { - TEST_KEY: [("invalid_plugin1", None), ("invalid_plugin2", {})] + TEST_KEY: [{"invalid_plugin1": {"key": "val"}}, {"invalid_plugin2": {}}] } # Utility method to create a test plugin config def create_plugin_config(host_list: list[str], - ssl: Optional[bool] = None, user: Optional[str] = None, password: Optional[str] = None) -> dict: config = dict() - if len(host_list) == 1: - config["hosts"] = host_list[0] - else: - config["hosts"] = host_list - if ssl: - config["ssl"] = ssl + config["hosts"] = host_list if user: - config["user"] = user + config["username"] = user if password: config["password"] = password return config @@ -36,7 +30,8 @@ def create_plugin_config(host_list: list[str], # Utility method to creat a test config section def create_config_section(plugin_config: dict) -> dict: - valid_plugin = (random.choice(main.SUPPORTED_ENDPOINTS), plugin_config) + valid_plugin = dict() + valid_plugin[random.choice(main.SUPPORTED_ENDPOINTS)] = plugin_config config_section = copy.deepcopy(BASE_CONFIG_SECTION) config_section[TEST_KEY].append(valid_plugin) return config_section @@ -45,53 +40,50 @@ def create_config_section(plugin_config: dict) -> dict: class TestMain(unittest.TestCase): # Run before each test def setUp(self) -> None: - with open(test_constants.LOGSTASH_PICKLE_FILE_PATH, "rb") as f: - self.loaded_logstash_config = pickle.load(f) + with open(test_constants.PIPELINE_CONFIG_PICKLE_FILE_PATH, "rb") as f: + self.loaded_pipeline_config = pickle.load(f) def test_get_auth_returns_none(self): # The following inputs should not return an auth tuple: # - Empty input # - user without password # - password without user - input_list = [{}, {"user": "test"}, {"password": "test"}] + input_list = [{}, {"username": "test"}, {"password": "test"}] for test_input in input_list: self.assertIsNone(main.get_auth(test_input)) def test_get_auth_for_valid_input(self): # Test valid input - result = main.get_auth({"user": "user", "password": "pass"}) + result = main.get_auth({"username": "user", "password": "pass"}) self.assertEqual(tuple, type(result)) self.assertEqual("user", result[0]) self.assertEqual("pass", result[1]) def test_get_endpoint_info(self): + host_input = "test" + expected_endpoint = "test/" test_user = "user" test_password = "password" # Simple base case - test_config = create_plugin_config(["test"]) + test_config = create_plugin_config([host_input]) result = main.get_endpoint_info(test_config) - self.assertEqual("http://test/", result[0]) + self.assertEqual(expected_endpoint, result[0]) self.assertIsNone(result[1]) - # SSL enabled - test_config = create_plugin_config(["test"], True) + # Invalid auth config + test_config = create_plugin_config([host_input], test_user) result = main.get_endpoint_info(test_config) - self.assertEqual("https://test/", result[0]) + self.assertEqual(expected_endpoint, result[0]) self.assertIsNone(result[1]) - # SSL disabled, invalid auth config - test_config = create_plugin_config(["test"], False, test_user) + # Valid auth config + test_config = create_plugin_config([host_input], user=test_user, password=test_password) result = main.get_endpoint_info(test_config) - self.assertEqual("http://test/", result[0]) - self.assertIsNone(result[1]) - # SSL disabled, valid auth config - test_config = create_plugin_config(["test"], user=test_user, password=test_password) - result = main.get_endpoint_info(test_config) - self.assertEqual("http://test/", result[0]) + self.assertEqual(expected_endpoint, result[0]) self.assertEqual(test_user, result[1][0]) self.assertEqual(test_password, result[1][1]) # Array of hosts uses the first entry - test_config = create_plugin_config(["test1", "test2"], True, test_user, test_password) + test_config = create_plugin_config([host_input, "other_host"], test_user, test_password) result = main.get_endpoint_info(test_config) - self.assertEqual("https://test1/", result[0]) + self.assertEqual(expected_endpoint, result[0]) self.assertEqual(test_user, result[1][0]) self.assertEqual(test_password, result[1][1]) @@ -184,22 +176,24 @@ def test_validate_plugin_config_bad_auth_user(self): self.assertRaises(ValueError, main.validate_plugin_config, test_data, TEST_KEY) def test_validate_plugin_config_happy_case(self): - plugin_config = create_plugin_config(["host"], True, "user", "password") + plugin_config = create_plugin_config(["host"], "user", "password") test_data = create_config_section(plugin_config) # Should complete without errors main.validate_plugin_config(test_data, TEST_KEY) - def test_validate_logstash_config_missing_required_keys(self): + def test_validate_pipeline_config_missing_required_keys(self): # Test cases: # - Empty input # - missing output # - missing input bad_configs = [{}, {"input": ()}, {"output": ()}] for config in bad_configs: - self.assertRaises(ValueError, main.validate_logstash_config, config) + self.assertRaises(ValueError, main.validate_pipeline_config, config) - def test_validate_logstash_config_happy_case(self): - main.validate_logstash_config(self.loaded_logstash_config) + def test_validate_pipeline_config_happy_case(self): + # Get top level value + test_config = next(iter(self.loaded_pipeline_config.values())) + main.validate_pipeline_config(test_config) @patch('main.print_report') @patch('index_operations.create_indices') @@ -222,7 +216,7 @@ def test_run(self, mock_fetch_indices: MagicMock, mock_create_indices: MagicMock index_settings[test_constants.INDEX_KEY][test_constants.NUM_REPLICAS_SETTING] += 1 # Fetch indices is called first for source, then for target mock_fetch_indices.side_effect = [test_constants.BASE_INDICES_DATA, target_indices_data] - main.run(test_constants.LOGSTASH_RAW_FILE_PATH) + main.run(test_constants.PIPELINE_CONFIG_RAW_FILE_PATH) mock_create_indices.assert_called_once_with(expected_create_payload, test_constants.TARGET_ENDPOINT, ANY) mock_print_report.assert_called_once_with(expected_diff)