Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[index configuration tool] Change to parsing Data Prepper pipeline YAML as input #215

Merged
merged 3 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions index_configuration_tool/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ FROM python:3.11-slim
WORKDIR /code
# Copy only required dependencies
COPY --from=builder /root/.local /root/.local
# Copy only source code and Logstash grammar
# Copy only source code
COPY ./*.py .
COPY logstash.lark .

# update PATH
ENV PATH=/root/.local:$PATH
Expand Down
39 changes: 0 additions & 39 deletions index_configuration_tool/logstash.lark

This file was deleted.

52 changes: 0 additions & 52 deletions index_configuration_tool/logstash_conf_parser.py

This file was deleted.

63 changes: 41 additions & 22 deletions index_configuration_tool/main.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import argparse
import yaml
from typing import Optional

import index_operations
import logstash_conf_parser as logstash_parser
import utils

# Constants
SUPPORTED_ENDPOINTS = ["opensearch", "elasticsearch"]
SOURCE_KEY = "source"
SINK_KEY = "sink"
HOSTS_KEY = "hosts"
USER_KEY = "user"
USER_KEY = "username"
PWD_KEY = "password"


Expand All @@ -19,10 +21,9 @@ def get_auth(input_data: dict) -> Optional[tuple]:


def get_endpoint_info(plugin_config: dict) -> tuple:
endpoint = "https://" if ("ssl" in plugin_config and plugin_config["ssl"]) else "http://"
kartg marked this conversation as resolved.
Show resolved Hide resolved
# "hosts" can be a simple string, or an array of hosts for Logstash to hit.
# This tool needs one accessible host, so we pick the first entry in the latter case.
endpoint += plugin_config[HOSTS_KEY][0] if type(plugin_config[HOSTS_KEY]) is list else plugin_config[HOSTS_KEY]
endpoint = plugin_config[HOSTS_KEY][0] if type(plugin_config[HOSTS_KEY]) is list else plugin_config[HOSTS_KEY]
endpoint += "/"
return endpoint, get_auth(plugin_config)

Expand All @@ -32,13 +33,28 @@ def fetch_all_indices_by_plugin(plugin_config: dict) -> dict:
return index_operations.fetch_all_indices(endpoint, auth_tuple)


def check_supported_endpoint(config: dict) -> Optional[tuple]:
for supported_type in SUPPORTED_ENDPOINTS:
if supported_type in config:
return supported_type, config[supported_type]


def get_supported_endpoint(config: dict, key: str) -> tuple:
# The value of each key is a list of plugin configs.
# Each config is a tuple, where the first value is the endpoint type.
supported_endpoint = next((p for p in config[key] if p[0] in SUPPORTED_ENDPOINTS), None)
if not supported_endpoint:
# The value of each key may be a single plugin (as a dict)
# or a list of plugin configs
supported_tuple = tuple()
if type(config[key]) is dict:
supported_tuple = check_supported_endpoint(config[key])
elif type(config[key]) is list:
for entry in config[key]:
supported_tuple = check_supported_endpoint(entry)
# Break out of the loop at the first supported type
if supported_tuple:
break
if not supported_tuple:
raise ValueError("Could not find any supported endpoints in section: " + key)
return supported_endpoint
# First tuple value is the name, second value is the config dict
return supported_tuple[0], supported_tuple[1]


def validate_plugin_config(config: dict, key: str):
Expand All @@ -53,11 +69,11 @@ def validate_plugin_config(config: dict, key: str):
raise ValueError("Invalid auth configuration (Password without user) for endpoint: " + supported_endpoint[0])


def validate_logstash_config(config: dict):
if "input" not in config or "output" not in config:
raise ValueError("Missing input or output data from Logstash config")
validate_plugin_config(config, "input")
validate_plugin_config(config, "output")
def validate_pipeline_config(config: dict):
if SOURCE_KEY not in config or SINK_KEY not in config:
raise ValueError("Missing source or sink configuration in Data Prepper pipeline YAML")
kartg marked this conversation as resolved.
Show resolved Hide resolved
validate_plugin_config(config, SOURCE_KEY)
validate_plugin_config(config, SINK_KEY)


# Computes differences in indices between source and target.
Expand Down Expand Up @@ -92,15 +108,18 @@ def print_report(index_differences: tuple[set, set, set]): # pragma no cover


def run(config_file_path: str) -> None:
# Parse and validate logstash config file
logstash_config = logstash_parser.parse(config_file_path)
validate_logstash_config(logstash_config)
# Parse and validate pipelines YAML file
with open(config_file_path, 'r') as pipeline_file:
dp_config = yaml.safe_load(pipeline_file)
# We expect the Data Prepper pipeline to only have a single top-level value
pipeline_config = next(iter(dp_config.values()))
validate_pipeline_config(pipeline_config)
# Endpoint is a tuple of (type, config)
endpoint = get_supported_endpoint(logstash_config, "input")
endpoint = get_supported_endpoint(pipeline_config, SOURCE_KEY)
# Fetch all indices from source cluster
source_indices = fetch_all_indices_by_plugin(endpoint[1])
# Fetch all indices from target cluster
endpoint = get_supported_endpoint(logstash_config, "output")
endpoint = get_supported_endpoint(pipeline_config, SINK_KEY)
target_endpoint, target_auth = get_endpoint_info(endpoint[1])
target_indices = index_operations.fetch_all_indices(target_endpoint, target_auth)
# Compute index differences and print report
Expand All @@ -119,16 +138,16 @@ def run(config_file_path: str) -> None:
arg_parser = argparse.ArgumentParser(
prog="python main.py",
description="This tool creates indices on a target cluster based on the contents of a source cluster.\n" +
"The source and target endpoints are obtained by parsing a Logstash config file, which is the " +
"sole expected argument for this module.\nAlso prints a report of the indices to be created, " +
"The source and target endpoints are obtained by parsing a Data Prepper pipelines YAML file, which " +
"is the sole expected argument for this module.\nAlso prints a report of the indices to be created, " +
"along with indices that are identical or have conflicting settings/mappings.\nIn case of the " +
"latter, no action will be taken on the target cluster.",
formatter_class=argparse.RawTextHelpFormatter
)
# This tool only takes one argument
arg_parser.add_argument(
"config_file_path",
help="Path to the Logstash config file to parse for source and target endpoint information"
help="Path to the Data Prepper pipeline YAML file to parse for source and target endpoint information"
)
args = arg_parser.parse_args()
print("\n##### Starting index configuration tool... #####\n")
Expand Down
2 changes: 1 addition & 1 deletion index_configuration_tool/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
jsondiff>=2.0.0
lark>=1.1.5
pyyaml>=6.0
requests>=2.28.2
responses>=0.23.1
Binary file not shown.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
test-pipeline-input:
source:
elasticsearch:
hosts: ["http://host1", "http://host2"]
username: "test_user"
password: "password"
processor:
- plugin1:
str_array: ["abc", "x y z"]
obj_array:
- key: "key1"
value: "val1"
- key: "key2"
value: "val 2"
sink:
- sink1:
num_array: [0]
- opensearch:
hosts: ["https://os_host"]
username: "test_user"
password: "test"
15 changes: 12 additions & 3 deletions index_configuration_tool/tests/test_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
from os.path import dirname

TEST_RESOURCES_SUBPATH = "/resources/"
LOGSTASH_RAW_FILE_PATH = dirname(__file__) + TEST_RESOURCES_SUBPATH + "logstash_test_input.conf"
LOGSTASH_PICKLE_FILE_PATH = dirname(__file__) + TEST_RESOURCES_SUBPATH + "expected_parse_output.pickle"
PIPELINE_CONFIG_RAW_FILE_PATH = dirname(__file__) + TEST_RESOURCES_SUBPATH + "test_pipeline_input.yaml"
PIPELINE_CONFIG_PICKLE_FILE_PATH = dirname(__file__) + TEST_RESOURCES_SUBPATH + "expected_parse_output.pickle"

INDEX1_NAME = "index1"
INDEX2_NAME = "index2"
Expand Down Expand Up @@ -51,6 +51,15 @@
}
}
}
# Based on the contents of logstash_test_input.conf
# Based on the contents of test_pipeline_input.yaml
SOURCE_ENDPOINT = "http://host1/"
TARGET_ENDPOINT = "https://os_host/"

# Utility logic to update the pickle file if/when the input file is updated
# import yaml
# import pickle
# if __name__ == '__main__':
# with open(PIPELINE_CONFIG_RAW_FILE_PATH, 'r') as test_input:
# test_config = yaml.safe_load(test_input)
# with open(PIPELINE_CONFIG_PICKLE_FILE_PATH, 'wb') as out:
# pickle.dump(test_config, out)
Loading