From 361edef423e1837dba26373497a8d5f0cb025dda Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Wed, 30 Aug 2023 14:17:02 -0700 Subject: [PATCH] Incorporated PR feedback from @mikaylathompson Signed-off-by: Kartik Ganesh --- .../index_configuration_tool/monitor.py | 34 +++++++++------ .../tests/test_monitor.py | 42 +++++++++++++++++-- 2 files changed, 60 insertions(+), 16 deletions(-) diff --git a/FetchMigration/index_configuration_tool/monitor.py b/FetchMigration/index_configuration_tool/monitor.py index 3c393a382..7f9f3ba61 100644 --- a/FetchMigration/index_configuration_tool/monitor.py +++ b/FetchMigration/index_configuration_tool/monitor.py @@ -20,9 +20,13 @@ def shutdown_pipeline(endpoint: EndpointInfo): requests.post(shutdown_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl) -def fetch_prometheus_metrics(endpoint: EndpointInfo) -> List[Metric]: +def fetch_prometheus_metrics(endpoint: EndpointInfo) -> Optional[List[Metric]]: metrics_endpoint = endpoint.url + __PROMETHEUS_METRICS_ENDPOINT - response = requests.get(metrics_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl) + try: + response = requests.get(metrics_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl) + response.raise_for_status() + except requests.exceptions.RequestException: + return None # Based on response headers defined in Data Prepper's PrometheusMetricsHandler.java class metrics = response.content.decode('utf-8') # Collect generator return values into list @@ -55,17 +59,20 @@ def run(args: argparse.Namespace, wait_seconds: int) -> None: endpoint = EndpointInfo(args.dp_endpoint, default_auth, False) prev_no_partitions_count = 0 terminal = False - target_doc_count = int(args.target_count) while not terminal: + # If the API call fails, the response is empty metrics = fetch_prometheus_metrics(endpoint) - success_docs = get_metric_value(metrics, __DOC_SUCCESS_METRIC) - rec_in_flight = get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC) - no_partitions_count = get_metric_value(metrics, __NO_PARTITIONS_METRIC) - terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count, - prev_no_partitions_count, target_doc_count) + if metrics is not None: + success_docs = get_metric_value(metrics, __DOC_SUCCESS_METRIC) + rec_in_flight = get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC) + no_partitions_count = get_metric_value(metrics, __NO_PARTITIONS_METRIC) + terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count, + prev_no_partitions_count, args.target_count) + if not terminal: + # Save no_partitions_count + prev_no_partitions_count = no_partitions_count + if not terminal: - # Save no_partitions_count - prev_no_partitions_count = no_partitions_count time.sleep(wait_seconds) # Loop terminated, shut down the Data Prepper pipeline shutdown_pipeline(endpoint) @@ -75,9 +82,9 @@ def run(args: argparse.Namespace, wait_seconds: int) -> None: # Set up parsing for command line arguments arg_parser = argparse.ArgumentParser( prog="python monitor.py", - description="Monitoring process for a running Data Prepper pipeline.\n" + - "The first input is the Data Prepper URL endpoint.\n" + - "The second input is the target doc_count for termination.", + description="""Monitoring process for a running Data Prepper pipeline. + The first input is the Data Prepper URL endpoint. + The second input is the target doc_count for termination.""", formatter_class=argparse.RawTextHelpFormatter ) # Required positional arguments @@ -87,6 +94,7 @@ def run(args: argparse.Namespace, wait_seconds: int) -> None: ) arg_parser.add_argument( "target_count", + type=int, help="Target doc_count to reach, after which the Data Prepper pipeline will be terminated" ) cli_args = arg_parser.parse_args() diff --git a/FetchMigration/index_configuration_tool/tests/test_monitor.py b/FetchMigration/index_configuration_tool/tests/test_monitor.py index 689846e70..e52d41c62 100644 --- a/FetchMigration/index_configuration_tool/tests/test_monitor.py +++ b/FetchMigration/index_configuration_tool/tests/test_monitor.py @@ -2,6 +2,8 @@ import unittest from unittest.mock import patch, MagicMock, PropertyMock +import requests +import responses from prometheus_client.parser import text_string_to_metric_families import monitor @@ -20,7 +22,6 @@ class TestMonitor(unittest.TestCase): @patch('requests.post') - # Note that mock objects are passed bottom-up from the patch order above def test_shutdown(self, mock_post: MagicMock): expected_shutdown_url = TEST_ENDPOINT + "/shutdown" test_endpoint = EndpointInfo(TEST_ENDPOINT, TEST_AUTH, TEST_FLAG) @@ -28,7 +29,6 @@ def test_shutdown(self, mock_post: MagicMock): mock_post.assert_called_once_with(expected_shutdown_url, auth=TEST_AUTH, verify=TEST_FLAG) @patch('requests.get') - # Note that mock objects are passed bottom-up from the patch order above def test_fetch_prometheus_metrics(self, mock_get: MagicMock): expected_url = TEST_ENDPOINT + "/metrics/prometheus" # Set up GET response @@ -51,6 +51,15 @@ def test_fetch_prometheus_metrics(self, mock_get: MagicMock): self.assertEqual(TEST_METRIC_VALUE, test_sample.value) self.assertTrue(len(test_sample.labels) > 0) + @responses.activate + def test_fetch_prometheus_metrics_failure(self): + # Set up expected GET call with a mock exception + expected_url = TEST_ENDPOINT + "/metrics/prometheus" + responses.get(expected_url, body=requests.Timeout()) + # Test fetch + result = monitor.fetch_prometheus_metrics(EndpointInfo(TEST_ENDPOINT)) + self.assertIsNone(result) + def test_get_metric_value(self): # Return value is an int expected_val = int(TEST_METRIC_VALUE) @@ -74,7 +83,6 @@ def test_run(self, mock_fetch: MagicMock, mock_get: MagicMock, mock_check: Magic # The values here don't matter since we've mocked the check method test_input.dp_endpoint = "test" test_input.target_count = 1 - mock_fetch.return_value = None mock_get.return_value = None # Check will first fail, then pass mock_check.side_effect = [False, True] @@ -83,6 +91,34 @@ def test_run(self, mock_fetch: MagicMock, mock_get: MagicMock, mock_check: Magic monitor.run(test_input, wait_time) # Test that fetch was called with the expected EndpointInfo expected_endpoint_info = EndpointInfo(test_input.dp_endpoint, ('admin', 'admin'), False) + self.assertEqual(2, mock_fetch.call_count) + mock_fetch.assert_called_with(expected_endpoint_info) + # We expect one wait cycle + mock_sleep.assert_called_once_with(wait_time) + mock_shut.assert_called_once_with(expected_endpoint_info) + + @patch('monitor.shutdown_pipeline') + @patch('time.sleep') + @patch('monitor.check_if_complete') + @patch('monitor.get_metric_value') + @patch('monitor.fetch_prometheus_metrics') + # Note that mock objects are passed bottom-up from the patch order above + def test_run_with_fetch_failure(self, mock_fetch: MagicMock, mock_get: MagicMock, mock_check: MagicMock, + mock_sleep: MagicMock, mock_shut: MagicMock): + test_input = argparse.Namespace() + # The values here don't matter since we've mocked the check method + test_input.dp_endpoint = "test" + test_input.target_count = 1 + mock_get.return_value = None + mock_check.return_value = True + # Fetch call will first fail, then succeed + mock_fetch.side_effect = [None, MagicMock()] + # Run test method + wait_time = 3 + monitor.run(test_input, wait_time) + # Test that fetch was called with the expected EndpointInfo + expected_endpoint_info = EndpointInfo(test_input.dp_endpoint, ('admin', 'admin'), False) + self.assertEqual(2, mock_fetch.call_count) mock_fetch.assert_called_with(expected_endpoint_info) # We expect one wait cycle mock_sleep.assert_called_once_with(wait_time)