Skip to content

Commit

Permalink
[Fetch Migration] Monitoring script for Data Prepper (#264)
Browse files Browse the repository at this point in the history
* [Fetch Migration] Monitoring script for Data Prepper

This change adds a monitor.py Python script that can monitor the running Data Prepper pipeline and shut it down once the target document count has been reached (as reported by Data Prepper's documentsSuccess Prometheus metric) and an idle pipeline is detected.

Signed-off-by: Kartik Ganesh <[email protected]>

---------

Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg authored Aug 31, 2023
1 parent 92405b8 commit 81f8a99
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 0 deletions.
103 changes: 103 additions & 0 deletions FetchMigration/index_configuration_tool/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import argparse
import time
from typing import Optional, List

import requests
from prometheus_client import Metric
from prometheus_client.parser import text_string_to_metric_families

from endpoint_info import EndpointInfo

__PROMETHEUS_METRICS_ENDPOINT = "/metrics/prometheus"
__SHUTDOWN_ENDPOINT = "/shutdown"
__DOC_SUCCESS_METRIC = "_opensearch_documentsSuccess"
__RECORDS_IN_FLIGHT_METRIC = "_BlockingBuffer_recordsInFlight"
__NO_PARTITIONS_METRIC = "_noPartitionsAcquired"


def shutdown_pipeline(endpoint: EndpointInfo):
shutdown_endpoint = endpoint.url + __SHUTDOWN_ENDPOINT
requests.post(shutdown_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)


def fetch_prometheus_metrics(endpoint: EndpointInfo) -> Optional[List[Metric]]:
metrics_endpoint = endpoint.url + __PROMETHEUS_METRICS_ENDPOINT
try:
response = requests.get(metrics_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)
response.raise_for_status()
except requests.exceptions.RequestException:
return None
# Based on response headers defined in Data Prepper's PrometheusMetricsHandler.java class
metrics = response.content.decode('utf-8')
# Collect generator return values into list
return list(text_string_to_metric_families(metrics))


def get_metric_value(metric_families: List, metric_suffix: str) -> Optional[int]:
for metric_family in metric_families:
if metric_family.name.endswith(metric_suffix):
return int(metric_family.samples[0].value)
return None


def check_if_complete(doc_count: Optional[int], in_flight: Optional[int], no_part_count: Optional[int],
prev_no_part_count: int, target: int) -> bool:
# Check for target doc_count
# TODO Add a check for partitionsCompleted = indices
if doc_count is not None and doc_count >= target:
# Check for idle pipeline
if in_flight is not None and in_flight == 0:
# No-partitions metrics should steadily tick up
if no_part_count is not None and no_part_count > prev_no_part_count > 0:
return True
return False


def run(args: argparse.Namespace, wait_seconds: int) -> None:
# TODO Remove hardcoded EndpointInfo
default_auth = ('admin', 'admin')
endpoint = EndpointInfo(args.dp_endpoint, default_auth, False)
prev_no_partitions_count = 0
terminal = False
while not terminal:
# If the API call fails, the response is empty
metrics = fetch_prometheus_metrics(endpoint)
if metrics is not None:
success_docs = get_metric_value(metrics, __DOC_SUCCESS_METRIC)
rec_in_flight = get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC)
no_partitions_count = get_metric_value(metrics, __NO_PARTITIONS_METRIC)
terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count,
prev_no_partitions_count, args.target_count)
if not terminal:
# Save no_partitions_count
prev_no_partitions_count = no_partitions_count

if not terminal:
time.sleep(wait_seconds)
# Loop terminated, shut down the Data Prepper pipeline
shutdown_pipeline(endpoint)


if __name__ == '__main__': # pragma no cover
# Set up parsing for command line arguments
arg_parser = argparse.ArgumentParser(
prog="python monitor.py",
description="""Monitoring process for a running Data Prepper pipeline.
The first input is the Data Prepper URL endpoint.
The second input is the target doc_count for termination.""",
formatter_class=argparse.RawTextHelpFormatter
)
# Required positional arguments
arg_parser.add_argument(
"dp_endpoint",
help="URL endpoint for the running Data Prepper process"
)
arg_parser.add_argument(
"target_count",
type=int,
help="Target doc_count to reach, after which the Data Prepper pipeline will be terminated"
)
cli_args = arg_parser.parse_args()
print("\n##### Starting monitor tool... #####\n")
run(cli_args, 30)
print("\n##### Ending monitor tool... #####\n")
1 change: 1 addition & 0 deletions FetchMigration/index_configuration_tool/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
jsondiff>=2.0.0
prometheus-client>=0.17.1
pyyaml>=6.0
requests>=2.28.2
responses>=0.23.1
143 changes: 143 additions & 0 deletions FetchMigration/index_configuration_tool/tests/test_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import argparse
import unittest
from unittest.mock import patch, MagicMock, PropertyMock

import requests
import responses
from prometheus_client.parser import text_string_to_metric_families

import monitor
from endpoint_info import EndpointInfo

# Constants
TEST_ENDPOINT = "test"
TEST_AUTH = ("user", "pass")
TEST_FLAG = False
TEST_METRIC_NAME = "test_metric"
TEST_METRIC_VALUE = 123.45
TEST_PROMETHEUS_METRIC_STRING = "# HELP " + TEST_METRIC_NAME + " Unit Test Metric\n"\
+ "# TYPE " + TEST_METRIC_NAME + " gauge\n" \
+ TEST_METRIC_NAME + "{serviceName=\"unittest\",} " + str(TEST_METRIC_VALUE)


class TestMonitor(unittest.TestCase):
@patch('requests.post')
def test_shutdown(self, mock_post: MagicMock):
expected_shutdown_url = TEST_ENDPOINT + "/shutdown"
test_endpoint = EndpointInfo(TEST_ENDPOINT, TEST_AUTH, TEST_FLAG)
monitor.shutdown_pipeline(test_endpoint)
mock_post.assert_called_once_with(expected_shutdown_url, auth=TEST_AUTH, verify=TEST_FLAG)

@patch('requests.get')
def test_fetch_prometheus_metrics(self, mock_get: MagicMock):
expected_url = TEST_ENDPOINT + "/metrics/prometheus"
# Set up GET response
mock_response = MagicMock()
# content is a property
mock_content = PropertyMock(return_value=bytes(TEST_PROMETHEUS_METRIC_STRING, "utf-8"))
type(mock_response).content = mock_content
mock_get.return_value = mock_response
# Test fetch
raw_metrics_list = monitor.fetch_prometheus_metrics(EndpointInfo(TEST_ENDPOINT))
mock_get.assert_called_once_with(expected_url, auth=None, verify=True)
self.assertEqual(1, len(raw_metrics_list))
test_metric = raw_metrics_list[0]
self.assertEqual(TEST_METRIC_NAME, test_metric.name)
self.assertTrue(len(test_metric.type) > 0)
self.assertTrue(len(test_metric.documentation) > 0)
self.assertEqual(1, len(test_metric.samples))
test_sample = test_metric.samples[0]
self.assertEqual(TEST_METRIC_NAME, test_sample.name)
self.assertEqual(TEST_METRIC_VALUE, test_sample.value)
self.assertTrue(len(test_sample.labels) > 0)

@responses.activate
def test_fetch_prometheus_metrics_failure(self):
# Set up expected GET call with a mock exception
expected_url = TEST_ENDPOINT + "/metrics/prometheus"
responses.get(expected_url, body=requests.Timeout())
# Test fetch
result = monitor.fetch_prometheus_metrics(EndpointInfo(TEST_ENDPOINT))
self.assertIsNone(result)

def test_get_metric_value(self):
# Return value is an int
expected_val = int(TEST_METRIC_VALUE)
test_input = list(text_string_to_metric_families(TEST_PROMETHEUS_METRIC_STRING))
# Should fetch by suffix
val = monitor.get_metric_value(test_input, "metric")
self.assertEqual(expected_val, val)
# No matching metric returns None
val = monitor.get_metric_value(test_input, "invalid")
self.assertEqual(None, val)

@patch('monitor.shutdown_pipeline')
@patch('time.sleep')
@patch('monitor.check_if_complete')
@patch('monitor.get_metric_value')
@patch('monitor.fetch_prometheus_metrics')
# Note that mock objects are passed bottom-up from the patch order above
def test_run(self, mock_fetch: MagicMock, mock_get: MagicMock, mock_check: MagicMock, mock_sleep: MagicMock,
mock_shut: MagicMock):
test_input = argparse.Namespace()
# The values here don't matter since we've mocked the check method
test_input.dp_endpoint = "test"
test_input.target_count = 1
mock_get.return_value = None
# Check will first fail, then pass
mock_check.side_effect = [False, True]
# Run test method
wait_time = 3
monitor.run(test_input, wait_time)
# Test that fetch was called with the expected EndpointInfo
expected_endpoint_info = EndpointInfo(test_input.dp_endpoint, ('admin', 'admin'), False)
self.assertEqual(2, mock_fetch.call_count)
mock_fetch.assert_called_with(expected_endpoint_info)
# We expect one wait cycle
mock_sleep.assert_called_once_with(wait_time)
mock_shut.assert_called_once_with(expected_endpoint_info)

@patch('monitor.shutdown_pipeline')
@patch('time.sleep')
@patch('monitor.check_if_complete')
@patch('monitor.get_metric_value')
@patch('monitor.fetch_prometheus_metrics')
# Note that mock objects are passed bottom-up from the patch order above
def test_run_with_fetch_failure(self, mock_fetch: MagicMock, mock_get: MagicMock, mock_check: MagicMock,
mock_sleep: MagicMock, mock_shut: MagicMock):
test_input = argparse.Namespace()
# The values here don't matter since we've mocked the check method
test_input.dp_endpoint = "test"
test_input.target_count = 1
mock_get.return_value = None
mock_check.return_value = True
# Fetch call will first fail, then succeed
mock_fetch.side_effect = [None, MagicMock()]
# Run test method
wait_time = 3
monitor.run(test_input, wait_time)
# Test that fetch was called with the expected EndpointInfo
expected_endpoint_info = EndpointInfo(test_input.dp_endpoint, ('admin', 'admin'), False)
self.assertEqual(2, mock_fetch.call_count)
mock_fetch.assert_called_with(expected_endpoint_info)
# We expect one wait cycle
mock_sleep.assert_called_once_with(wait_time)
mock_shut.assert_called_once_with(expected_endpoint_info)

def test_check_if_complete(self):
# If any of the optional values are missing, we are not complete
self.assertFalse(monitor.check_if_complete(None, 0, 1, 0, 2))
self.assertFalse(monitor.check_if_complete(2, None, 1, 0, 2))
self.assertFalse(monitor.check_if_complete(2, 0, None, 0, 2))
# Target count not reached
self.assertFalse(monitor.check_if_complete(1, None, None, 0, 2))
# Target count reached, but has records in flight
self.assertFalse(monitor.check_if_complete(2, 1, None, 0, 2))
# Target count reached, no records in flight, but no prev no_part_count
self.assertFalse(monitor.check_if_complete(2, 0, 1, 0, 2))
# Terminal state
self.assertTrue(monitor.check_if_complete(2, 0, 2, 1, 2))


if __name__ == '__main__':
unittest.main()

0 comments on commit 81f8a99

Please sign in to comment.