Skip to content

Commit

Permalink
[Testing] Add metric verification to e2e tests (#711)
Browse files Browse the repository at this point in the history
* add metric verification to e2e tests

* Cleanup & debugging

* Stop mounting console library directory & fixes

* Fix misnamed variable

* Update services-yaml in CDK to handle metrics source correctly

* update cdk tests

* fix bug and add metrics policy

* Experiment with increasing container size

* Fix cdk test for new yaml format

* make assert message more helpful

* Fix (very) newly introduced numpy dependency issue

* skip captureProxy metric checks if testing in cloud deployment

* address review comments


---------

Signed-off-by: Mikayla Thompson <[email protected]>
  • Loading branch information
mikaylathompson authored Jun 17, 2024
1 parent 20a2356 commit 2fa83e2
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ services:
- sharedReplayerOutput:/shared-replayer-output
- ./migrationConsole/lib/console_link/services.yaml:/etc/migration_services.yaml
# this is a convenience thing for testing -- it should be removed before this makes it to prod.
- ./migrationConsole/lib/console_link:/root/lib/console_link
# - ./migrationConsole/lib/console_link:/root/lib/console_link
- ~/.aws:/root/.aws
environment:
- MIGRATION_KAFKA_BROKER_ENDPOINTS=kafka:9092
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ tqdm==4.66.4

# Specified [h5py==3.10.0] as 3.11.0 version was causing issues with ARM64 builds
# Error: Building h5py requires pkg-config unless the HDF5 path is explicitly specified using the environment variable HDF5_DIR
h5py==3.10.0
h5py==3.10.0
# Numpy 2.0 was released on 2024-06-16 and h5py is not yet compatible with it. Locking to a sub-2.0 version for now.
numpy>=1.26.4,<2.0
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class CloudwatchMetricsSource(MetricsSource):
def __init__(self, config: Dict) -> None:
super().__init__(config)
logger.info(f"Initializing CloudwatchMetricsSource from config {config}")
if "aws_region" in config["cloudwatch"]:
if type(config["cloudwatch"]) is dict and "aws_region" in config["cloudwatch"]:
self.aws_region = config["cloudwatch"]["aws_region"]
self.boto_config = botocore.config.Config(region_name=self.aws_region)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ import * as yaml from 'yaml';

export class ClusterYaml {
endpoint: string = '';
no_auth?: string;
basic_auth?: object;
no_auth?: string | null;
basic_auth?: object | null;
}

export class MetricsSourceYaml {
type: string = 'cloudwatch';
region?: string;
cloudwatch? : object | null = null;
}

export class ServicesYaml {
Expand All @@ -21,6 +20,9 @@ export class ServicesYaml {
source_cluster: this.source_cluster,
target_cluster: this.target_cluster,
metrics_source: this.metrics_source
},
{
'nullStr': ''
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ export class MigrationConsoleStack extends MigrationServiceCore {

const openSearchPolicy = createOpenSearchIAMAccessPolicy(this.partition, this.region, this.account)
const openSearchServerlessPolicy = createOpenSearchServerlessIAMAccessPolicy(this.partition, this.region, this.account)
let servicePolicies = [replayerOutputMountPolicy, openSearchPolicy, openSearchServerlessPolicy, ecsUpdateServicePolicy, clusterTasksPolicy, listTasksPolicy, artifactS3PublishPolicy, describeVPCPolicy, getSSMParamsPolicy]
let servicePolicies = [replayerOutputMountPolicy, openSearchPolicy, openSearchServerlessPolicy, ecsUpdateServicePolicy, clusterTasksPolicy,
listTasksPolicy, artifactS3PublishPolicy, describeVPCPolicy, getSSMParamsPolicy, getMetricsPolicy]
if (props.streamingSourceType === StreamingSourceType.AWS_MSK) {
const mskAdminPolicies = this.createMSKAdminIAMPolicies(props.stage, props.defaultDeployId)
servicePolicies = servicePolicies.concat(mskAdminPolicies)
Expand Down Expand Up @@ -318,8 +319,8 @@ export class MigrationConsoleStack extends MigrationServiceCore {
environment: environment,
taskRolePolicies: servicePolicies,
cpuArchitecture: props.fargateCpuArch,
taskCpuUnits: 512,
taskMemoryLimitMiB: 1024,
taskCpuUnits: 1024,
taskMemoryLimitMiB: 2048,
...props
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ test('Test that services yaml parameter is created', () => {
const yamlFileContents = value['Fn::Join'][1].join('')
expect(yamlFileContents).toContain('source_cluster')
expect(yamlFileContents).toContain('target_cluster')
expect(yamlFileContents).toContain('metrics_source:\n type: cloudwatch')
expect(yamlFileContents).toContain('metrics_source:\n cloudwatch:')

// Validates that the file can be parsed as valid yaml and has the expected fields
const parsedFromYaml = yaml.parse(yamlFileContents);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import { ClusterYaml, ServicesYaml } from "../lib/migration-services-yaml"
test('Test default servicesYaml can be stringified', () => {
const servicesYaml = new ServicesYaml();
expect(servicesYaml.metrics_source).toBeDefined();
expect(servicesYaml.metrics_source.type).toBe("cloudwatch");
expect(Object.keys(servicesYaml.metrics_source)).toContain("cloudwatch");
const yaml = servicesYaml.stringify();
expect(yaml).toBe("metrics_source:\n type: cloudwatch\n");
expect(yaml).toBe("metrics_source:\n cloudwatch:\n");
})

test('Test servicesYaml with cluster can be stringified', () => {
Expand All @@ -15,7 +15,7 @@ test('Test servicesYaml with cluster can be stringified', () => {

expect(servicesYaml.target_cluster).toBeDefined();
const yaml = servicesYaml.stringify();
expect(yaml).toBe(`target_cluster:\n endpoint: ${cluster.endpoint}\n no_auth: ""\nmetrics_source:\n type: cloudwatch\n`);
expect(yaml).toBe(`target_cluster:\n endpoint: ${cluster.endpoint}\n no_auth: ""\nmetrics_source:\n cloudwatch:\n`);
})

test('Test servicesYaml with cluster can be stringified', () => {
Expand All @@ -31,5 +31,5 @@ test('Test servicesYaml with cluster can be stringified', () => {
expect(servicesYaml.source_cluster).toBeDefined();
const yaml = servicesYaml.stringify();
const sourceClusterYaml = `source_cluster:\n endpoint: ${sourceCluster.endpoint}\n basic_auth:\n user: ${sourceClusterUser}\n password: ${sourceClusterPassword}\n`
expect(yaml).toBe(`${sourceClusterYaml}target_cluster:\n endpoint: ${targetCluster.endpoint}\n no_auth: ""\nmetrics_source:\n type: cloudwatch\n`);
expect(yaml).toBe(`${sourceClusterYaml}target_cluster:\n endpoint: ${targetCluster.endpoint}\n no_auth: ""\nmetrics_source:\n cloudwatch:\n`);
})
33 changes: 33 additions & 0 deletions test/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
import string
import json
from requests import Session
import shlex
import subprocess
import logging

logger = logging.getLogger(__name__)


def create_index(endpoint: str, index_name: str, auth, verify_ssl: bool = False, session: Session = Session()):
Expand Down Expand Up @@ -67,3 +72,31 @@ def get_document(endpoint: str, index_name: str, doc_id: str, auth,
response = session.get(url, headers=headers, auth=auth, verify=verify_ssl)

return response


class ContainerNotFoundError(Exception):
def __init__(self, container_filter):
super().__init__(f"No containers matching the filter '{container_filter}' were found.")


def run_migration_console_command(deployment_type: str, command: str):
if deployment_type == "local":
filter_criteria = 'name=\"migration-console\"'
cmd = f'docker ps --format=\"{{{{.ID}}}}\" --filter {filter_criteria}'

get_container_process = subprocess.run(shlex.split(cmd), stdout=subprocess.PIPE, text=True)
container_id = get_container_process.stdout.strip().replace('"', '')

if container_id:
cmd_exec = f"docker exec {container_id} bash -c '{command}'"
logger.warning(f"Running command: {cmd_exec} on container {container_id}")
process = subprocess.run(cmd_exec, shell=True, capture_output=True, text=True)
return process.returncode, process.stdout, process.stderr
else:
raise ContainerNotFoundError(filter_criteria)

else:
# In a cloud deployment case, we run the e2e tests directly on the migration console, so it's just a local call
logger.warning(f"Running command: {command} locally")
process = subprocess.run(command, shell=True, capture_output=True)
return process.returncode, process.stdout, process.stderr
105 changes: 83 additions & 22 deletions test/tests.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import boto3
import functools
import json
import logging
import pytest
import requests
import secrets
import string
from operations import generate_large_doc
import subprocess
import time
import unittest
from http import HTTPStatus
from requests import Session
from requests.adapters import HTTPAdapter
from requests.exceptions import ConnectionError, SSLError
from requests_aws4auth import AWS4Auth
from typing import Tuple, Callable
from typing import Tuple, Callable, List, Dict

from operations import create_index, check_index, create_document, \
delete_document, delete_index, get_document
delete_document, delete_index, get_document, run_migration_console_command

logger = logging.getLogger(__name__)

Expand All @@ -39,6 +39,25 @@ def get_doc_count(endpoint, index, auth, verify):
return count


def assert_metrics_present(*wrapper_args, **wrapper_kwargs):
def decorator(test_func):
@functools.wraps(test_func)
def wrapper(self, *args, **kwargs):
# Run the original test function
try:
test_func(self, *args, **kwargs)
test_passed = True
except AssertionError as e:
test_passed = False
raise e
finally:
if test_passed:
# Only look for metrics if the test passed
self.assert_metrics(*wrapper_args, **wrapper_kwargs)
return wrapper
return decorator


# The following "retry_request" function's purpose is to retry a certain request for "max_attempts"
# times every "delay" seconds IF the requests returned a status code other than what's expected.
# So this "retry_request" function's arguments are a request function's name and whatever arguments that function
Expand Down Expand Up @@ -137,6 +156,53 @@ def set_common_values(self):
def setUp(self):
self.set_common_values()

# Note that the names of metrics are a bit different in a local vs cloud deployment.
# The transformation is somewhat hardcoded here--the user should put in the local name, and if its
# a cloud deployment, everything after the first `_` will be discarded. This should generally cause
# things to match, but it's possible there are edge cases that it doesn't account for
# Note as well, that currently the only way of assuming data is correlated with a given test is via
# the lookback time. Soon, we should implement a way to add a specific ID to metrics from a given run
# and check for the presence of that ID.
def assert_metric_has_data(self, component: str, metric: str, lookback_minutes: int):
command = f"console --json metrics get-data {component} {metric} --lookback {lookback_minutes}"
returncode, stdout, stderr = run_migration_console_command(
self.deployment_type,
command
)
self.assertEqual(returncode, 0, f"Return code from `{command}` was non-zero. Stderr output: {stderr}")
data = json.loads(stdout)
self.assertNotEqual(
len(data), 0,
f"Metric {metric} for component {component} does not exist or does "
f"not have data within the last {lookback_minutes} minutes"
)

def assert_metrics(self, expected_metrics: Dict[str, List[str]], lookback_minutes=2, wait_before_check_seconds=60):
"""
This is the method invoked by the `@assert_metrics_present` decorator.
params:
expected_metrics: a dictionary of component->[metrics], for each metric that should be verified.
lookback_minutes: the number of minutes into the past to query for metrics
wait_before_check_seconds: the time in seconds to delay before checking for the presence of metrics
"""
logger.debug(f"Waiting {wait_before_check_seconds} before checking for metrics.")
time.sleep(wait_before_check_seconds)
for component, expected_comp_metrics in expected_metrics.items():
if component == "captureProxy" and self.deployment_type == "cloud":
# We currently do not emit captureProxy metrics from a non-standalone proxy, which is the scenario
# tested in our e2e tests. Therefore, we don't want to assert metrics exist in this situation. We
# should remove this clause as soon as we start testing the standalone proxy scenario.
logger.warning("Skipping metric verification for captureProxy metrics in a cloud deployment.")
continue
for expected_metric in expected_comp_metrics:
if self.deployment_type == 'cloud':
expected_metric = expected_metric.split('_', 1)[0]
self.assert_metric_has_data(component, expected_metric, lookback_minutes)

@assert_metrics_present({
'captureProxy': ['kafkaCommitCount_total'],
'replayer': ['kafkaCommitCount_total']
})
def test_0001_index(self):
# This test will verify that an index will be created (then deleted) on the target cluster when one is created
# on the source cluster by going through the proxy first. It will verify that the traffic is captured by the
Expand Down Expand Up @@ -271,29 +337,24 @@ def test_0005_invalidIncorrectUri(self):
self.assertEqual(response.status_code, HTTPStatus.METHOD_NOT_ALLOWED)

def test_0006_OSB(self):
cmd = "/root/runTestBenchmarks.sh"

if self.deployment_type == "cloud":
cmd_exec = f"/root/runTestBenchmarks.sh --endpoint {self.proxy_endpoint}"
if self.source_auth_type == "none":
cmd_exec = cmd_exec + " --no-auth"
auth_string = " --no-auth"
elif self.source_auth_type == "basic":
cmd_exec = cmd_exec + f" --auth-user {self.source_username} --auth-pass {self.source_password}"
logger.warning(f"Running local command: {cmd_exec}")
subprocess.run(cmd_exec, shell=True)
# TODO: Enhance our waiting logic for determining when all OSB records have been processed by Replayer
time.sleep(360)
else:
cmd = ['docker', 'ps', '--format="{{.ID}}"', '--filter', 'name=migration']
container_id = subprocess.run(cmd, stdout=subprocess.PIPE, text=True).stdout.strip().replace('"', '')

if container_id:
cmd_exec = f"docker exec {container_id} ./runTestBenchmarks.sh"
logger.warning(f"Running command: {cmd_exec}")
subprocess.run(cmd_exec, shell=True)
time.sleep(5)
auth_string = f" --auth-user {self.source_username} --auth-pass {self.source_password}"
else:
logger.error("Migration-console container was not found,"
" please double check that deployment was a success")
self.assert_(False)
auth_string = ""

cmd += f" --endpoint {self.proxy_endpoint} {auth_string}"
sleep_time = 360
else:
sleep_time = 5

returncode, _, stderr = run_migration_console_command(self.deployment_type, cmd)
self.assertEqual(returncode, 0, f"Running command {cmd} failed with stderr output:\n{stderr}")
time.sleep(sleep_time)

source_indices = get_indices(self.source_endpoint, self.source_auth, self.source_verify_ssl)
valid_source_indices = set([index for index in source_indices
Expand Down

0 comments on commit 2fa83e2

Please sign in to comment.