diff --git a/README.md b/README.md
index b770a170d..20167799e 100644
--- a/README.md
+++ b/README.md
@@ -83,3 +83,35 @@ The release process is standard across repositories in this org and is run by a
2. The [release-drafter.yml](.github/workflows/release-drafter.yml) will be automatically kicked off and a draft release will be created.
3. This draft release triggers the [jenkins release workflow](https://build.ci.opensearch.org/job/opensearch-migrations-release) as a result of which the opensearch-migrations toolset is released and published on artifacts.opensearch.org example as https://artifacts.opensearch.org/migrations/0.1.0/opensearch-migrations-0.1.0.tar.gz.
4. Once the above release workflow is successful, the drafted release on GitHub is published automatically.
+
+## Publishing
+
+This project can be published to a local maven repository with:
+```sh
+./gradlew publishToMavenLocal
+```
+
+And subsequently imported into a separate gradle project with (replacing name with any subProject name)
+```groovy
+repositories {
+ mavenCentral()
+ mavenLocal()
+}
+
+dependencies {
+ implementation group: "org.opensearch.migrations.trafficcapture", name: "captureKafkaOffloader", version: "0.1.0-SNAPSHOT"
+ //... other dependencies
+}
+```
+
+The entire list of published subprojects can be viewed with
+```sh
+./gradlew listPublishedArtifacts
+```
+
+
+To include a testFixture dependency, define the import like
+
+```groovy
+testImplementation testFixtures('org.opensearch.migrations.trafficcapture:trafficReplayer:0.1.0-SNAPSHOT')
+```
diff --git a/RFS/src/main/java/com/rfs/common/Uid.java b/RFS/src/main/java/com/rfs/common/Uid.java
index fcec17a30..b7453b9b2 100644
--- a/RFS/src/main/java/com/rfs/common/Uid.java
+++ b/RFS/src/main/java/com/rfs/common/Uid.java
@@ -48,13 +48,13 @@ private static String decodeBase64Id(byte[] idBytes, int offset, int length) {
}
/** Decode an indexed id back to its original form.
- * @see #encodeId */
+ * @see org.elasticsearch.index.mapper.Uid#encodeId */
public static String decodeId(byte[] idBytes) {
return decodeId(idBytes, 0, idBytes.length);
}
/** Decode an indexed id back to its original form.
- * @see #encodeId */
+ * @see org.elasticsearch.index.mapper.Uid#encodeId */
public static String decodeId(byte[] idBytes, int offset, int length) {
if (length == 0) {
throw new IllegalArgumentException("Ids can't be empty");
diff --git a/TrafficCapture/README.md b/TrafficCapture/README.md
index 464576262..bcf4e21d7 100644
--- a/TrafficCapture/README.md
+++ b/TrafficCapture/README.md
@@ -176,50 +176,3 @@ echo 'export OS_MIGRATIONS_GRADLE_SCAN_TOS_AGREE_AND_ENABLED=' >> ~/.zshrc
```
Access your detailed build reports by following the link provided at the end of your Gradle command's output.
-
-## Publishing
-
-This project can be published to a local maven repository with:
-```sh
-../gradlew publishToMavenLocal
-```
-
-And subsequently imported into a separate gradle project with (replacing name with any subProject name)
-```groovy
-repositories {
- mavenCentral()
- mavenLocal()
-}
-
-dependencies {
- implementation group: "org.opensearch.migrations.trafficcapture", name: "captureKafkaOffloader", version: "0.1.0-SNAPSHOT"
- //... other dependencies
-}
-```
-
-The entire list of published subprojects is
-```text
-captureKafkaOffloader
-captureOffloader
-captureProtobufs
-coreUtilities
-jsonJMESPathMessageTransformer
-jsonJMESPathMessageTransformerProvider
-jsonJoltMessageTransformer
-jsonJoltMessageTransformerProvider
-jsonMessageTransformerInterface
-jsonMessageTransformers
-nettyWireLogging
-openSearch23PlusTargetTransformerProvider
-transformationPlugins
-testUtilities
-trafficCaptureProxyServer
-trafficCaptureProxyServerTest
-trafficReplayer
-```
-
-To include a testFixture dependency, define the import like
-
-```groovy
-testImplementation testFixtures('org.opensearch.migrations.trafficcapture:trafficReplayer:0.1.0-SNAPSHOT')
-```
diff --git a/TrafficCapture/build.gradle b/TrafficCapture/build.gradle
index 77ef7e314..6b2a46e2e 100644
--- a/TrafficCapture/build.gradle
+++ b/TrafficCapture/build.gradle
@@ -27,78 +27,6 @@ subprojects {
}
}
}
-
- task javadocJar(type: Jar, dependsOn: javadoc) {
- archiveClassifier.set('javadoc')
- from javadoc.destinationDir
- }
- task sourcesJar(type: Jar) {
- archiveClassifier.set('sources')
- from sourceSets.main.allSource
- duplicatesStrategy = DuplicatesStrategy.EXCLUDE
- }
- def excludedProjects = [
- 'buildSrc',
- 'dockerSolution',
- ]
- if (!(project.name in excludedProjects)) {
- publishing {
- publications {
- mavenJava(MavenPublication) {
- from components.java
- artifact javadocJar
- artifact sourcesJar
-
-
- group = 'org.opensearch.migrations.trafficcapture'
-
- // support -Dbuild.version, but include default
- version = System.getProperty("build.version", "0.1.0")
-
- // support -Dbuild.snapshot=false, but default to true
- if (System.getProperty("build.snapshot", "true") == "true") {
- version += "-SNAPSHOT"
- }
-
-
- pom {
- name = project.name
- description = 'Everything opensearch migrations'
- url = 'http://github.com/opensearch-project/opensearch-migrations'
-
- licenses {
- license {
- name = 'The Apache License, Version 2.0'
- url = 'http://www.apache.org/licenses/LICENSE-2.0.txt'
- }
- }
- developers {
- developer {
- name = "OpenSearch"
- url = "https://github.com/opensearch-project/opensearch-migrations"
- }
- }
- scm {
- connection = "scm:git@github.com:opensearch-project/opensearch-migrations.git"
- developerConnection = "scm:git@github.com:opensearch-project/opensearch-migrations.git"
- url = "git@github.com:opensearch-project/opensearch-migrations.git"
- }
- }
-
- // Suppress POM metadata warnings for test fixtures
- suppressPomMetadataWarningsFor('testFixturesApiElements')
- suppressPomMetadataWarningsFor('testFixturesRuntimeElements')
- }
- }
- repositories {
- maven { url = "${rootProject.buildDir}/repository"}
- maven {
- url "https://aws.oss.sonatype.org/content/repositories/snapshots"
- name = 'staging'
- }
- }
- }
- }
}
allprojects {
diff --git a/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml b/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml
index ee7f2c88a..65c24f7a1 100644
--- a/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml
+++ b/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml
@@ -97,7 +97,7 @@ services:
- sharedReplayerOutput:/shared-replayer-output
- ./migrationConsole/lib/console_link/services.yaml:/etc/migration_services.yaml
# this is a convenience thing for testing -- it should be removed before this makes it to prod.
- - ./migrationConsole/lib/console_link:/root/lib/console_link
+ # - ./migrationConsole/lib/console_link:/root/lib/console_link
- ~/.aws:/root/.aws
environment:
- MIGRATION_KAFKA_BROKER_ENDPOINTS=kafka:9092
diff --git a/TrafficCapture/dockerSolution/src/main/docker/elasticsearchTestConsole/requirements.txt b/TrafficCapture/dockerSolution/src/main/docker/elasticsearchTestConsole/requirements.txt
index 19ce12e6c..9d0ea8024 100644
--- a/TrafficCapture/dockerSolution/src/main/docker/elasticsearchTestConsole/requirements.txt
+++ b/TrafficCapture/dockerSolution/src/main/docker/elasticsearchTestConsole/requirements.txt
@@ -6,4 +6,6 @@ tqdm==4.66.4
# Specified [h5py==3.10.0] as 3.11.0 version was causing issues with ARM64 builds
# Error: Building h5py requires pkg-config unless the HDF5 path is explicitly specified using the environment variable HDF5_DIR
-h5py==3.10.0
\ No newline at end of file
+h5py==3.10.0
+# Numpy 2.0 was released on 2024-06-16 and h5py is not yet compatible with it. Locking to a sub-2.0 version for now.
+numpy>=1.26.4,<2.0
\ No newline at end of file
diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metrics_source.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metrics_source.py
index e7a09f608..36d07e7eb 100644
--- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metrics_source.py
+++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metrics_source.py
@@ -130,7 +130,7 @@ class CloudwatchMetricsSource(MetricsSource):
def __init__(self, config: Dict) -> None:
super().__init__(config)
logger.info(f"Initializing CloudwatchMetricsSource from config {config}")
- if "aws_region" in config["cloudwatch"]:
+ if type(config["cloudwatch"]) is dict and "aws_region" in config["cloudwatch"]:
self.aws_region = config["cloudwatch"]["aws_region"]
self.boto_config = botocore.config.Config(region_name=self.aws_region)
else:
diff --git a/build.gradle b/build.gradle
index 65d18085c..22a461f09 100644
--- a/build.gradle
+++ b/build.gradle
@@ -5,4 +5,101 @@ plugins {
task buildDockerImages() {
dependsOn(':TrafficCapture:dockerSolution:buildDockerImages')
dependsOn(':RFS:buildDockerImages')
+}
+
+subprojects {
+ apply plugin: 'java'
+ apply plugin: 'maven-publish'
+
+ tasks.withType(Test) {
+ // Getting javadoc to compile is part of the test suite to ensure we are able to publish our artifacts
+ dependsOn project.javadoc
+ }
+
+ task javadocJar(type: Jar, dependsOn: javadoc) {
+ archiveClassifier.set('javadoc')
+ from javadoc.destinationDir
+ }
+
+ task sourcesJar(type: Jar) {
+ archiveClassifier.set('sources')
+ from sourceSets.main.allSource
+ duplicatesStrategy = DuplicatesStrategy.EXCLUDE
+ }
+ def excludedProjectPaths = [
+ ':RFS',
+ ':TrafficCapture',
+ ':TrafficCapture:dockerSolution',
+ ]
+ if (!(project.path in excludedProjectPaths)) {
+ publishing {
+ publications {
+ mavenJava(MavenPublication) {
+ from components.java
+ artifact javadocJar
+ artifact sourcesJar
+
+ group = 'org.opensearch.migrations.trafficcapture'
+
+ // support -Dbuild.version, but include default
+ version = System.getProperty("build.version", "0.1.0")
+
+ // support -Dbuild.snapshot=false, but default to true
+ if (System.getProperty("build.snapshot", "true") == "true") {
+ version += "-SNAPSHOT"
+ }
+
+ pom {
+ name = project.name
+ description = 'Everything opensearch migrations'
+ url = 'http://github.com/opensearch-project/opensearch-migrations'
+
+ licenses {
+ license {
+ name = 'The Apache License, Version 2.0'
+ url = 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+ }
+ }
+ developers {
+ developer {
+ name = "OpenSearch"
+ url = "https://github.com/opensearch-project/opensearch-migrations"
+ }
+ }
+ scm {
+ connection = "scm:git@github.com:opensearch-project/opensearch-migrations.git"
+ developerConnection = "scm:git@github.com:opensearch-project/opensearch-migrations.git"
+ url = "git@github.com:opensearch-project/opensearch-migrations.git"
+ }
+ }
+
+ // Suppress POM metadata warnings for test fixtures
+ suppressPomMetadataWarningsFor('testFixturesApiElements')
+ suppressPomMetadataWarningsFor('testFixturesRuntimeElements')
+ }
+ }
+ repositories {
+ maven { url = "${rootProject.buildDir}/repository"}
+ maven {
+ url "https://aws.oss.sonatype.org/content/repositories/snapshots"
+ name = 'staging'
+ }
+ }
+ }
+ }
+}
+
+task listPublishedArtifacts {
+ doLast {
+ subprojects.each { proj ->
+ def publishingExtension = proj.extensions.findByType(PublishingExtension)
+ if (publishingExtension) {
+ publishingExtension.publications.each { publication ->
+ if (publication instanceof MavenPublication) {
+ println "${publication.groupId}.${publication.artifactId}"
+ }
+ }
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts b/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts
index 55d23b283..b410d61e0 100644
--- a/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts
+++ b/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts
@@ -2,13 +2,12 @@ import * as yaml from 'yaml';
export class ClusterYaml {
endpoint: string = '';
- no_auth?: string;
- basic_auth?: object;
+ no_auth?: string | null;
+ basic_auth?: object | null;
}
export class MetricsSourceYaml {
- type: string = 'cloudwatch';
- region?: string;
+ cloudwatch? : object | null = null;
}
export class ServicesYaml {
@@ -21,6 +20,9 @@ export class ServicesYaml {
source_cluster: this.source_cluster,
target_cluster: this.target_cluster,
metrics_source: this.metrics_source
+ },
+ {
+ 'nullStr': ''
})
}
}
diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts
index e5667d6e7..c57035694 100644
--- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts
+++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts
@@ -244,7 +244,8 @@ export class MigrationConsoleStack extends MigrationServiceCore {
const openSearchPolicy = createOpenSearchIAMAccessPolicy(this.partition, this.region, this.account)
const openSearchServerlessPolicy = createOpenSearchServerlessIAMAccessPolicy(this.partition, this.region, this.account)
- let servicePolicies = [replayerOutputMountPolicy, openSearchPolicy, openSearchServerlessPolicy, ecsUpdateServicePolicy, clusterTasksPolicy, listTasksPolicy, artifactS3PublishPolicy, describeVPCPolicy, getSSMParamsPolicy]
+ let servicePolicies = [replayerOutputMountPolicy, openSearchPolicy, openSearchServerlessPolicy, ecsUpdateServicePolicy, clusterTasksPolicy,
+ listTasksPolicy, artifactS3PublishPolicy, describeVPCPolicy, getSSMParamsPolicy, getMetricsPolicy]
if (props.streamingSourceType === StreamingSourceType.AWS_MSK) {
const mskAdminPolicies = this.createMSKAdminIAMPolicies(props.stage, props.defaultDeployId)
servicePolicies = servicePolicies.concat(mskAdminPolicies)
@@ -318,8 +319,8 @@ export class MigrationConsoleStack extends MigrationServiceCore {
environment: environment,
taskRolePolicies: servicePolicies,
cpuArchitecture: props.fargateCpuArch,
- taskCpuUnits: 512,
- taskMemoryLimitMiB: 1024,
+ taskCpuUnits: 1024,
+ taskMemoryLimitMiB: 2048,
...props
});
}
diff --git a/deployment/cdk/opensearch-service-migration/test/migration-console-stack.test.ts b/deployment/cdk/opensearch-service-migration/test/migration-console-stack.test.ts
index 15bf2dfe2..94cb1665c 100644
--- a/deployment/cdk/opensearch-service-migration/test/migration-console-stack.test.ts
+++ b/deployment/cdk/opensearch-service-migration/test/migration-console-stack.test.ts
@@ -81,7 +81,7 @@ test('Test that services yaml parameter is created', () => {
const yamlFileContents = value['Fn::Join'][1].join('')
expect(yamlFileContents).toContain('source_cluster')
expect(yamlFileContents).toContain('target_cluster')
- expect(yamlFileContents).toContain('metrics_source:\n type: cloudwatch')
+ expect(yamlFileContents).toContain('metrics_source:\n cloudwatch:')
// Validates that the file can be parsed as valid yaml and has the expected fields
const parsedFromYaml = yaml.parse(yamlFileContents);
diff --git a/deployment/cdk/opensearch-service-migration/test/migration-services-yaml.test.ts b/deployment/cdk/opensearch-service-migration/test/migration-services-yaml.test.ts
index 909a47d88..a22a53ccf 100644
--- a/deployment/cdk/opensearch-service-migration/test/migration-services-yaml.test.ts
+++ b/deployment/cdk/opensearch-service-migration/test/migration-services-yaml.test.ts
@@ -3,9 +3,9 @@ import { ClusterYaml, ServicesYaml } from "../lib/migration-services-yaml"
test('Test default servicesYaml can be stringified', () => {
const servicesYaml = new ServicesYaml();
expect(servicesYaml.metrics_source).toBeDefined();
- expect(servicesYaml.metrics_source.type).toBe("cloudwatch");
+ expect(Object.keys(servicesYaml.metrics_source)).toContain("cloudwatch");
const yaml = servicesYaml.stringify();
- expect(yaml).toBe("metrics_source:\n type: cloudwatch\n");
+ expect(yaml).toBe("metrics_source:\n cloudwatch:\n");
})
test('Test servicesYaml with cluster can be stringified', () => {
@@ -15,7 +15,7 @@ test('Test servicesYaml with cluster can be stringified', () => {
expect(servicesYaml.target_cluster).toBeDefined();
const yaml = servicesYaml.stringify();
- expect(yaml).toBe(`target_cluster:\n endpoint: ${cluster.endpoint}\n no_auth: ""\nmetrics_source:\n type: cloudwatch\n`);
+ expect(yaml).toBe(`target_cluster:\n endpoint: ${cluster.endpoint}\n no_auth: ""\nmetrics_source:\n cloudwatch:\n`);
})
test('Test servicesYaml with cluster can be stringified', () => {
@@ -31,5 +31,5 @@ test('Test servicesYaml with cluster can be stringified', () => {
expect(servicesYaml.source_cluster).toBeDefined();
const yaml = servicesYaml.stringify();
const sourceClusterYaml = `source_cluster:\n endpoint: ${sourceCluster.endpoint}\n basic_auth:\n user: ${sourceClusterUser}\n password: ${sourceClusterPassword}\n`
- expect(yaml).toBe(`${sourceClusterYaml}target_cluster:\n endpoint: ${targetCluster.endpoint}\n no_auth: ""\nmetrics_source:\n type: cloudwatch\n`);
+ expect(yaml).toBe(`${sourceClusterYaml}target_cluster:\n endpoint: ${targetCluster.endpoint}\n no_auth: ""\nmetrics_source:\n cloudwatch:\n`);
})
diff --git a/test/operations.py b/test/operations.py
index 73eb7dd15..c72e62989 100644
--- a/test/operations.py
+++ b/test/operations.py
@@ -3,6 +3,11 @@
import string
import json
from requests import Session
+import shlex
+import subprocess
+import logging
+
+logger = logging.getLogger(__name__)
def create_index(endpoint: str, index_name: str, auth, verify_ssl: bool = False, session: Session = Session()):
@@ -67,3 +72,31 @@ def get_document(endpoint: str, index_name: str, doc_id: str, auth,
response = session.get(url, headers=headers, auth=auth, verify=verify_ssl)
return response
+
+
+class ContainerNotFoundError(Exception):
+ def __init__(self, container_filter):
+ super().__init__(f"No containers matching the filter '{container_filter}' were found.")
+
+
+def run_migration_console_command(deployment_type: str, command: str):
+ if deployment_type == "local":
+ filter_criteria = 'name=\"migration-console\"'
+ cmd = f'docker ps --format=\"{{{{.ID}}}}\" --filter {filter_criteria}'
+
+ get_container_process = subprocess.run(shlex.split(cmd), stdout=subprocess.PIPE, text=True)
+ container_id = get_container_process.stdout.strip().replace('"', '')
+
+ if container_id:
+ cmd_exec = f"docker exec {container_id} bash -c '{command}'"
+ logger.warning(f"Running command: {cmd_exec} on container {container_id}")
+ process = subprocess.run(cmd_exec, shell=True, capture_output=True, text=True)
+ return process.returncode, process.stdout, process.stderr
+ else:
+ raise ContainerNotFoundError(filter_criteria)
+
+ else:
+ # In a cloud deployment case, we run the e2e tests directly on the migration console, so it's just a local call
+ logger.warning(f"Running command: {command} locally")
+ process = subprocess.run(command, shell=True, capture_output=True)
+ return process.returncode, process.stdout, process.stderr
diff --git a/test/tests.py b/test/tests.py
index 921ab69e0..b18f7ef9b 100644
--- a/test/tests.py
+++ b/test/tests.py
@@ -1,4 +1,5 @@
import boto3
+import functools
import json
import logging
import pytest
@@ -6,7 +7,6 @@
import secrets
import string
from operations import generate_large_doc
-import subprocess
import time
import unittest
from http import HTTPStatus
@@ -14,10 +14,10 @@
from requests.adapters import HTTPAdapter
from requests.exceptions import ConnectionError, SSLError
from requests_aws4auth import AWS4Auth
-from typing import Tuple, Callable
+from typing import Tuple, Callable, List, Dict
from operations import create_index, check_index, create_document, \
- delete_document, delete_index, get_document
+ delete_document, delete_index, get_document, run_migration_console_command
logger = logging.getLogger(__name__)
@@ -39,6 +39,25 @@ def get_doc_count(endpoint, index, auth, verify):
return count
+def assert_metrics_present(*wrapper_args, **wrapper_kwargs):
+ def decorator(test_func):
+ @functools.wraps(test_func)
+ def wrapper(self, *args, **kwargs):
+ # Run the original test function
+ try:
+ test_func(self, *args, **kwargs)
+ test_passed = True
+ except AssertionError as e:
+ test_passed = False
+ raise e
+ finally:
+ if test_passed:
+ # Only look for metrics if the test passed
+ self.assert_metrics(*wrapper_args, **wrapper_kwargs)
+ return wrapper
+ return decorator
+
+
# The following "retry_request" function's purpose is to retry a certain request for "max_attempts"
# times every "delay" seconds IF the requests returned a status code other than what's expected.
# So this "retry_request" function's arguments are a request function's name and whatever arguments that function
@@ -137,6 +156,53 @@ def set_common_values(self):
def setUp(self):
self.set_common_values()
+ # Note that the names of metrics are a bit different in a local vs cloud deployment.
+ # The transformation is somewhat hardcoded here--the user should put in the local name, and if its
+ # a cloud deployment, everything after the first `_` will be discarded. This should generally cause
+ # things to match, but it's possible there are edge cases that it doesn't account for
+ # Note as well, that currently the only way of assuming data is correlated with a given test is via
+ # the lookback time. Soon, we should implement a way to add a specific ID to metrics from a given run
+ # and check for the presence of that ID.
+ def assert_metric_has_data(self, component: str, metric: str, lookback_minutes: int):
+ command = f"console --json metrics get-data {component} {metric} --lookback {lookback_minutes}"
+ returncode, stdout, stderr = run_migration_console_command(
+ self.deployment_type,
+ command
+ )
+ self.assertEqual(returncode, 0, f"Return code from `{command}` was non-zero. Stderr output: {stderr}")
+ data = json.loads(stdout)
+ self.assertNotEqual(
+ len(data), 0,
+ f"Metric {metric} for component {component} does not exist or does "
+ f"not have data within the last {lookback_minutes} minutes"
+ )
+
+ def assert_metrics(self, expected_metrics: Dict[str, List[str]], lookback_minutes=2, wait_before_check_seconds=60):
+ """
+ This is the method invoked by the `@assert_metrics_present` decorator.
+ params:
+ expected_metrics: a dictionary of component->[metrics], for each metric that should be verified.
+ lookback_minutes: the number of minutes into the past to query for metrics
+ wait_before_check_seconds: the time in seconds to delay before checking for the presence of metrics
+ """
+ logger.debug(f"Waiting {wait_before_check_seconds} before checking for metrics.")
+ time.sleep(wait_before_check_seconds)
+ for component, expected_comp_metrics in expected_metrics.items():
+ if component == "captureProxy" and self.deployment_type == "cloud":
+ # We currently do not emit captureProxy metrics from a non-standalone proxy, which is the scenario
+ # tested in our e2e tests. Therefore, we don't want to assert metrics exist in this situation. We
+ # should remove this clause as soon as we start testing the standalone proxy scenario.
+ logger.warning("Skipping metric verification for captureProxy metrics in a cloud deployment.")
+ continue
+ for expected_metric in expected_comp_metrics:
+ if self.deployment_type == 'cloud':
+ expected_metric = expected_metric.split('_', 1)[0]
+ self.assert_metric_has_data(component, expected_metric, lookback_minutes)
+
+ @assert_metrics_present({
+ 'captureProxy': ['kafkaCommitCount_total'],
+ 'replayer': ['kafkaCommitCount_total']
+ })
def test_0001_index(self):
# This test will verify that an index will be created (then deleted) on the target cluster when one is created
# on the source cluster by going through the proxy first. It will verify that the traffic is captured by the
@@ -271,29 +337,24 @@ def test_0005_invalidIncorrectUri(self):
self.assertEqual(response.status_code, HTTPStatus.METHOD_NOT_ALLOWED)
def test_0006_OSB(self):
+ cmd = "/root/runTestBenchmarks.sh"
+
if self.deployment_type == "cloud":
- cmd_exec = f"/root/runTestBenchmarks.sh --endpoint {self.proxy_endpoint}"
if self.source_auth_type == "none":
- cmd_exec = cmd_exec + " --no-auth"
+ auth_string = " --no-auth"
elif self.source_auth_type == "basic":
- cmd_exec = cmd_exec + f" --auth-user {self.source_username} --auth-pass {self.source_password}"
- logger.warning(f"Running local command: {cmd_exec}")
- subprocess.run(cmd_exec, shell=True)
- # TODO: Enhance our waiting logic for determining when all OSB records have been processed by Replayer
- time.sleep(360)
- else:
- cmd = ['docker', 'ps', '--format="{{.ID}}"', '--filter', 'name=migration']
- container_id = subprocess.run(cmd, stdout=subprocess.PIPE, text=True).stdout.strip().replace('"', '')
-
- if container_id:
- cmd_exec = f"docker exec {container_id} ./runTestBenchmarks.sh"
- logger.warning(f"Running command: {cmd_exec}")
- subprocess.run(cmd_exec, shell=True)
- time.sleep(5)
+ auth_string = f" --auth-user {self.source_username} --auth-pass {self.source_password}"
else:
- logger.error("Migration-console container was not found,"
- " please double check that deployment was a success")
- self.assert_(False)
+ auth_string = ""
+
+ cmd += f" --endpoint {self.proxy_endpoint} {auth_string}"
+ sleep_time = 360
+ else:
+ sleep_time = 5
+
+ returncode, _, stderr = run_migration_console_command(self.deployment_type, cmd)
+ self.assertEqual(returncode, 0, f"Running command {cmd} failed with stderr output:\n{stderr}")
+ time.sleep(sleep_time)
source_indices = get_indices(self.source_endpoint, self.source_auth, self.source_verify_ssl)
valid_source_indices = set([index for index in source_indices