Skip to content

Commit

Permalink
Incorporated PR feedback from @mikaylathompson
Browse files Browse the repository at this point in the history
Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg committed Aug 30, 2023
1 parent 31ea718 commit 361edef
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 16 deletions.
34 changes: 21 additions & 13 deletions FetchMigration/index_configuration_tool/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ def shutdown_pipeline(endpoint: EndpointInfo):
requests.post(shutdown_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)


def fetch_prometheus_metrics(endpoint: EndpointInfo) -> List[Metric]:
def fetch_prometheus_metrics(endpoint: EndpointInfo) -> Optional[List[Metric]]:
metrics_endpoint = endpoint.url + __PROMETHEUS_METRICS_ENDPOINT
response = requests.get(metrics_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)
try:
response = requests.get(metrics_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)
response.raise_for_status()
except requests.exceptions.RequestException:
return None
# Based on response headers defined in Data Prepper's PrometheusMetricsHandler.java class
metrics = response.content.decode('utf-8')
# Collect generator return values into list
Expand Down Expand Up @@ -55,17 +59,20 @@ def run(args: argparse.Namespace, wait_seconds: int) -> None:
endpoint = EndpointInfo(args.dp_endpoint, default_auth, False)
prev_no_partitions_count = 0
terminal = False
target_doc_count = int(args.target_count)
while not terminal:
# If the API call fails, the response is empty
metrics = fetch_prometheus_metrics(endpoint)
success_docs = get_metric_value(metrics, __DOC_SUCCESS_METRIC)
rec_in_flight = get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC)
no_partitions_count = get_metric_value(metrics, __NO_PARTITIONS_METRIC)
terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count,
prev_no_partitions_count, target_doc_count)
if metrics is not None:
success_docs = get_metric_value(metrics, __DOC_SUCCESS_METRIC)
rec_in_flight = get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC)
no_partitions_count = get_metric_value(metrics, __NO_PARTITIONS_METRIC)
terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count,
prev_no_partitions_count, args.target_count)
if not terminal:
# Save no_partitions_count
prev_no_partitions_count = no_partitions_count

if not terminal:
# Save no_partitions_count
prev_no_partitions_count = no_partitions_count
time.sleep(wait_seconds)
# Loop terminated, shut down the Data Prepper pipeline
shutdown_pipeline(endpoint)
Expand All @@ -75,9 +82,9 @@ def run(args: argparse.Namespace, wait_seconds: int) -> None:
# Set up parsing for command line arguments
arg_parser = argparse.ArgumentParser(
prog="python monitor.py",
description="Monitoring process for a running Data Prepper pipeline.\n" +
"The first input is the Data Prepper URL endpoint.\n" +
"The second input is the target doc_count for termination.",
description="""Monitoring process for a running Data Prepper pipeline.
The first input is the Data Prepper URL endpoint.
The second input is the target doc_count for termination.""",
formatter_class=argparse.RawTextHelpFormatter
)
# Required positional arguments
Expand All @@ -87,6 +94,7 @@ def run(args: argparse.Namespace, wait_seconds: int) -> None:
)
arg_parser.add_argument(
"target_count",
type=int,
help="Target doc_count to reach, after which the Data Prepper pipeline will be terminated"
)
cli_args = arg_parser.parse_args()
Expand Down
42 changes: 39 additions & 3 deletions FetchMigration/index_configuration_tool/tests/test_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import unittest
from unittest.mock import patch, MagicMock, PropertyMock

import requests
import responses
from prometheus_client.parser import text_string_to_metric_families

import monitor
Expand All @@ -20,15 +22,13 @@

class TestMonitor(unittest.TestCase):
@patch('requests.post')
# Note that mock objects are passed bottom-up from the patch order above
def test_shutdown(self, mock_post: MagicMock):
expected_shutdown_url = TEST_ENDPOINT + "/shutdown"
test_endpoint = EndpointInfo(TEST_ENDPOINT, TEST_AUTH, TEST_FLAG)
monitor.shutdown_pipeline(test_endpoint)
mock_post.assert_called_once_with(expected_shutdown_url, auth=TEST_AUTH, verify=TEST_FLAG)

@patch('requests.get')
# Note that mock objects are passed bottom-up from the patch order above
def test_fetch_prometheus_metrics(self, mock_get: MagicMock):
expected_url = TEST_ENDPOINT + "/metrics/prometheus"
# Set up GET response
Expand All @@ -51,6 +51,15 @@ def test_fetch_prometheus_metrics(self, mock_get: MagicMock):
self.assertEqual(TEST_METRIC_VALUE, test_sample.value)
self.assertTrue(len(test_sample.labels) > 0)

@responses.activate
def test_fetch_prometheus_metrics_failure(self):
# Set up expected GET call with a mock exception
expected_url = TEST_ENDPOINT + "/metrics/prometheus"
responses.get(expected_url, body=requests.Timeout())
# Test fetch
result = monitor.fetch_prometheus_metrics(EndpointInfo(TEST_ENDPOINT))
self.assertIsNone(result)

def test_get_metric_value(self):
# Return value is an int
expected_val = int(TEST_METRIC_VALUE)
Expand All @@ -74,7 +83,6 @@ def test_run(self, mock_fetch: MagicMock, mock_get: MagicMock, mock_check: Magic
# The values here don't matter since we've mocked the check method
test_input.dp_endpoint = "test"
test_input.target_count = 1
mock_fetch.return_value = None
mock_get.return_value = None
# Check will first fail, then pass
mock_check.side_effect = [False, True]
Expand All @@ -83,6 +91,34 @@ def test_run(self, mock_fetch: MagicMock, mock_get: MagicMock, mock_check: Magic
monitor.run(test_input, wait_time)
# Test that fetch was called with the expected EndpointInfo
expected_endpoint_info = EndpointInfo(test_input.dp_endpoint, ('admin', 'admin'), False)
self.assertEqual(2, mock_fetch.call_count)
mock_fetch.assert_called_with(expected_endpoint_info)
# We expect one wait cycle
mock_sleep.assert_called_once_with(wait_time)
mock_shut.assert_called_once_with(expected_endpoint_info)

@patch('monitor.shutdown_pipeline')
@patch('time.sleep')
@patch('monitor.check_if_complete')
@patch('monitor.get_metric_value')
@patch('monitor.fetch_prometheus_metrics')
# Note that mock objects are passed bottom-up from the patch order above
def test_run_with_fetch_failure(self, mock_fetch: MagicMock, mock_get: MagicMock, mock_check: MagicMock,
mock_sleep: MagicMock, mock_shut: MagicMock):
test_input = argparse.Namespace()
# The values here don't matter since we've mocked the check method
test_input.dp_endpoint = "test"
test_input.target_count = 1
mock_get.return_value = None
mock_check.return_value = True
# Fetch call will first fail, then succeed
mock_fetch.side_effect = [None, MagicMock()]
# Run test method
wait_time = 3
monitor.run(test_input, wait_time)
# Test that fetch was called with the expected EndpointInfo
expected_endpoint_info = EndpointInfo(test_input.dp_endpoint, ('admin', 'admin'), False)
self.assertEqual(2, mock_fetch.call_count)
mock_fetch.assert_called_with(expected_endpoint_info)
# We expect one wait cycle
mock_sleep.assert_called_once_with(wait_time)
Expand Down

0 comments on commit 361edef

Please sign in to comment.