Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrations - Capture and replay v0.1.0 - Extracted Comparator and Jupyter #271

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions TrafficCapture/dockerSolution/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,6 @@ down again.
Notice that most of the Dockerfiles are dynamically constructed in the build hierarchy. Some efforts have been made
to ensure that changes will make it into containers to be launched.

If a user wants to use their own checkout of the traffic-comparator repo, just set the environment variable "
TRAFFIC_COMPARATOR_DIRECTORY" to the directory that contains `setup.py`. Otherwise, if that isn't set, the traffic
comparator repo will be checked out to the build directory and that will be used. Notice that the checkout happens when
the directory wasn't present and there wasn't an environment variable specifying a directory. Once a directory exists,
it will be mounted to the traffic-comparator and jupyter services.

### Running the Docker Solution

While Docker is running, in the TrafficCapture directory run the following command:
Expand Down
64 changes: 1 addition & 63 deletions TrafficCapture/dockerSolution/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,6 @@ import java.security.MessageDigest
import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage
import org.apache.tools.ant.taskdefs.condition.Os

def getTrafficComparatorDirectory() {
String overrideTrafficComparatorDirectory = System.getenv(TRAFFIC_COMPARATOR_DIRECTORY_ENV)
String rval = overrideTrafficComparatorDirectory != null ?
overrideTrafficComparatorDirectory : TRAFFIC_COMPARATOR_REPO_DIRECTORY;
return rval
}

ext {
TRAFFIC_COMPARATOR_REPO_DIRECTORY = "build/traffic-comparator"
TRAFFIC_COMPARATOR_DIRECTORY_ENV = "TRAFFIC_COMPARATOR_DIRECTORY"
REALIZED_TRAFFIC_COMPARATOR_DIRECTORY = project.file(getTrafficComparatorDirectory())
}

def calculateDockerHash = { projectName ->
CommonUtils.calculateDockerHash(projectName, project)
}
Expand All @@ -31,17 +18,6 @@ dependencies {
implementation project(':trafficReplayer')
}

task cloneComparatorRepoIfNeeded(type: Exec) {
String comparatorDirectory = project.file(REALIZED_TRAFFIC_COMPARATOR_DIRECTORY);
String repo = 'https://github.com/opensearch-project/traffic-comparator.git'
onlyIf {
!(new File(comparatorDirectory).exists())
}
commandLine = Os.isFamily(Os.FAMILY_WINDOWS) ?
['git', 'clone', repo, TRAFFIC_COMPARATOR_REPO_DIRECTORY ] :
['/bin/sh', '-c', "git clone ${repo} ${TRAFFIC_COMPARATOR_REPO_DIRECTORY}"]
}

def dockerFilesForExternalServices = [
"elasticsearchWithSearchGuard": "elasticsearch_searchguard",
"migrationConsole": "migration_console"
Expand All @@ -56,36 +32,6 @@ dockerFilesForExternalServices.each { projectName, dockerImageName ->
}
}

def trafficComparatorServices = [
"trafficComparator": "traffic_comparator",
"jupyterNotebook": "jupyter_notebook"
]
trafficComparatorServices.forEach {projectName, dockerImageName ->
def dockerBuildDir = "build/docker/${projectName}"
task("copyArtifact_${projectName}", type: Copy) {
dependsOn(tasks.getByName('cloneComparatorRepoIfNeeded'))
from REALIZED_TRAFFIC_COMPARATOR_DIRECTORY
into dockerBuildDir
include '*.py'
include '/traffic_comparator/*'
if (projectName == 'jupyterNotebook') {
include '*.ipynb'
}
}

task "createDockerfile_${projectName}"(type: com.bmuschko.gradle.docker.tasks.image.Dockerfile) {
dependsOn "copyArtifact_${projectName}"
destFile = project.file("${dockerBuildDir}/Dockerfile")
from 'python:3.10.10'
runCommand("apt-get update && apt-get install -y netcat lsof")
copyFile("setup.py", "/setup.py")
copyFile(".", "/containerTC/")
runCommand("pip3 install --editable \".[data]\"")
// container stay-alive
defaultCommand('tail', '-f', '/dev/null')
}
}

def javaContainerServices = [
"trafficCaptureProxyServer": "capture_proxy",
"trafficReplayer": "traffic_replayer"
Expand All @@ -101,7 +47,7 @@ javaContainerServices.each { projectName, dockerImageName ->
CommonUtils.createDockerfile(project, projectName, baseImageProjectOverrides, dockerFilesForExternalServices)
}

(javaContainerServices + trafficComparatorServices).forEach { projectName, dockerImageName ->
(javaContainerServices).forEach { projectName, dockerImageName ->
def dockerBuildDir = "build/docker/${projectName}"
task "buildDockerImage_${projectName}"(type: DockerBuildImage) {
dependsOn "createDockerfile_${projectName}"
Expand All @@ -112,11 +58,6 @@ javaContainerServices.each { projectName, dockerImageName ->
}

dockerCompose {
String overrideTrafficComparatorDirectory = System.getenv(TRAFFIC_COMPARATOR_DIRECTORY_ENV)
if (overrideTrafficComparatorDirectory == null) {
environment.put(TRAFFIC_COMPARATOR_DIRECTORY_ENV, REALIZED_TRAFFIC_COMPARATOR_DIRECTORY)
exposeAsEnvironment(this)
}
useComposeFiles.add("src/main/docker/docker-compose.yml")
}

Expand All @@ -126,10 +67,7 @@ task buildDockerImages {

dependsOn buildDockerImage_trafficCaptureProxyServer
dependsOn buildDockerImage_trafficReplayer
dependsOn buildDockerImage_trafficComparator
dependsOn buildDockerImage_jupyterNotebook
}

tasks.getByName('composeUp')
.dependsOn(tasks.getByName('buildDockerImages'))
.dependsOn(tasks.getByName('cloneComparatorRepoIfNeeded'))
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ services:
condition: service_started
opensearchtarget:
condition: service_started
trafficcomparator:
condition: service_healthy
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46YWRtaW4= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id default-logging-group | nc trafficcomparator 9220"
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46YWRtaW4= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id default-logging-group >/dev/null"

opensearchtarget:
image: 'opensearchproject/opensearch:latest'
Expand All @@ -89,33 +87,6 @@ services:
ports:
- "29200:9200"

trafficcomparator:
image: 'migrations/traffic_comparator:latest'
networks:
- migrations
ports:
- "9220:9220"
healthcheck:
test: "lsof -i -P -n"
volumes:
- ${TRAFFIC_COMPARATOR_DIRECTORY}:/trafficComparator
- sharedComparatorSqlResults:/shared
command: /bin/sh -c "cd trafficComparator && pip3 install --editable . && nc -v -l -p 9220 | tee /dev/stderr | trafficcomparator -vv stream | trafficcomparator dump-to-sqlite --db /shared/comparisons.db"

jupyter_notebook:
image: 'migrations/jupyter_notebook:latest'
networks:
- migrations
ports:
- "8888:8888"
volumes:
- ${TRAFFIC_COMPARATOR_DIRECTORY}:/trafficComparator
- sharedComparatorSqlResults:/shared
environment:
# this needs to match the output db that traffic_comparator writes to
- COMPARISONS_DB_LOCATION=/shared/comparisons.db
command: /bin/sh -c 'cd trafficComparator && pip3 install --editable ".[data]" && jupyter notebook --ip=0.0.0.0 --port=8888 --no-browser --allow-root'

migration_console:
image: 'migrations/migration_console:latest'
networks:
Expand All @@ -128,8 +99,6 @@ volumes:
driver: local
kafka_data:
driver: local
sharedComparatorSqlResults:
driver: local
sharedReplayerOutput:
driver: local

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Disabled;

import java.time.Duration;
import java.time.Instant;
Expand All @@ -19,6 +20,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@Disabled
@Slf4j
class ExpiringSubstitutableItemPoolTest {

Expand Down Expand Up @@ -144,4 +146,4 @@ private static Integer getIntegerItem(AtomicInteger builtItemCursor,
lastCreation.set(Instant.now());
return Integer.valueOf(builtItemCursor.incrementAndGet());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* transformation and the headers present (such as for gzipped or chunked encodings).
*
* There will be a number of reasons that we need to reuse the source captured packets - both today
* (for the output to the comparator) and in the future (for retrying transient network errors or
* (for the output and in the future (for retrying transient network errors or
* transformation errors). With that in mind, the HttpJsonTransformer now keeps track of all of
* the ByteBufs passed into it and can redrive them through the underlying network packet handler.
* Cases where that would happen with this edit are where the payload wasn't being modified, nor
Expand All @@ -52,7 +52,7 @@ public class HttpJsonTransformingConsumer implements IPacketFinalizingConsumer<A
private final List<List<Integer>> chunkSizes;
// This is here for recovery, in case anything goes wrong with a transformation & we want to
// just dump it directly. Notice that we're already storing all of the bytes until the response
// comes back so that we can format the output that goes to the comparator. These should be
// comes back so that we can format the output that goes downstream. These should be
// backed by the exact same byte[] arrays, so the memory consumption should already be absorbed.
private final List<ByteBuf> chunks;

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,6 @@ export class MigrationAssistanceStack extends Stack {
});
this.mskARN = mskCluster.attrArn

const comparatorSQLiteSG = new SecurityGroup(this, 'comparatorSQLiteSG', {
vpc: props.vpc,
allowAllOutbound: false,
});
comparatorSQLiteSG.addIngressRule(comparatorSQLiteSG, Port.allTraffic());

// Create an EFS file system for the traffic-comparator
const comparatorSQLiteEFS = new FileSystem(this, 'comparatorSQLiteEFS', {
vpc: props.vpc,
securityGroup: comparatorSQLiteSG
});

const replayerOutputSG = new SecurityGroup(this, 'replayerOutputSG', {
vpc: props.vpc,
allowAllOutbound: false,
Expand All @@ -131,8 +119,6 @@ export class MigrationAssistanceStack extends Stack {
const exports = [
`export MIGRATION_VPC_ID=${props.vpc.vpcId}`,
`export MIGRATION_CAPTURE_MSK_SG_ID=${mskSecurityGroup.securityGroupId}`,
`export MIGRATION_COMPARATOR_EFS_ID=${comparatorSQLiteEFS.fileSystemId}`,
`export MIGRATION_COMPARATOR_EFS_SG_ID=${comparatorSQLiteSG.securityGroupId}`,
`export MIGRATION_REPLAYER_OUTPUT_EFS_ID=${replayerOutputEFS.fileSystemId}`,
`export MIGRATION_REPLAYER_OUTPUT_EFS_SG_ID=${replayerOutputSG.securityGroupId}`]
if (publicSubnetString) exports.push(`export MIGRATION_PUBLIC_SUBNETS=${publicSubnetString}`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import {AnyPrincipal, Effect, PolicyStatement} from "aws-cdk-lib/aws-iam";
import * as defaultValuesJson from "../default-values.json"
import {NetworkStack} from "./network-stack";
import {MigrationAssistanceStack} from "./migration-assistance-stack";
import {HistoricalCaptureStack} from "./historical-capture-stack";
import {MSKUtilityStack} from "./msk-utility-stack";

export interface StackPropsExt extends StackProps {
Expand Down Expand Up @@ -187,25 +186,6 @@ export class StackComposer {
this.stacks.push(mskUtilityStack)
}

// Currently, placing a requirement on a VPC for a historical capture stack but this can be revisited
// Note: Future work to provide orchestration between historical capture and migration assistance as the current
// state will potentially have both stacks trying to add the same data
if (historicalCaptureEnabled && networkStack) {
const historicalCaptureStack = new HistoricalCaptureStack(scope, "historicalCaptureStack", {
vpc: networkStack.vpc,
logstashConfigFilePath: logstashConfigFilePath,
sourceEndpoint: sourceClusterEndpoint,
targetEndpoint: opensearchStack.domainEndpoint,
stackName: `OSServiceHistoricalCDKStack-${stage}-${region}`,
description: "This stack contains resources to assist migrating historical data to an OpenSearch Service domain",
...props,
})

historicalCaptureStack.addDependency(opensearchStack)
this.stacks.push(historicalCaptureStack)
}


function getContextForType(optionName: string, expectedType: string): any {
const option = scope.node.tryGetContext(optionName)

Expand Down
12 changes: 2 additions & 10 deletions deployment/copilot/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Copilot is a tool for deploying containerized applications on AWS ECS. Official
###### Docker
Docker is used by Copilot to build container images. If not installed, follow the steps [here](https://docs.docker.com/engine/install/) to set up. Later versions are recommended.
###### Git
Git is used by the opensearch-migrations repo to fetch associated repositories (such as the traffic-comparator repo) for constructing their respective Dockerfiles. Steps to set up can be found [here](https://github.com/git-guides/install-git).
Git is used by the opensearch-migrations repo. Steps to set up can be found [here](https://github.com/git-guides/install-git).
###### Java 11
Java is used by the opensearch-migrations repo and Gradle, its associated build tool. The current required version is Java 11.

Expand Down Expand Up @@ -78,8 +78,6 @@ export MIGRATION_DOMAIN_USER_NAME=admin
export MIGRATION_DOMAIN_USER_SECRET_ARN=arn:aws:secretsmanager:us-east-1:123456789123:secret:demo-user-secret-123abc
export MIGRATION_VPC_ID=vpc-123;
export MIGRATION_CAPTURE_MSK_SG_ID=sg-123;
export MIGRATION_COMPARATOR_EFS_ID=fs-123;
export MIGRATION_COMPARATOR_EFS_SG_ID=sg-123;
export MIGRATION_REPLAYER_OUTPUT_EFS_ID=fs-124
export MIGRATION_REPLAYER_OUTPUT_EFS_SG_ID=sg-124
export MIGRATION_PUBLIC_SUBNETS=subnet-123,subnet-124;
Expand All @@ -88,7 +86,7 @@ export MIGRATION_KAFKA_BROKER_ENDPOINTS=b-1-public.loggingmskcluster.123.45.kafk
```
Additionally, if not using the deploy script, the following export is needed for the Replayer service:
```
export MIGRATION_REPLAYER_COMMAND=/bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer $MIGRATION_DOMAIN_ENDPOINT --insecure --kafka-traffic-brokers $MIGRATION_KAFKA_BROKER_ENDPOINTS --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id default-logging-group --kafka-traffic-enable-msk-auth --aws-auth-header-user $MIGRATION_DOMAIN_USER_NAME --aws-auth-header-secret $MIGRATION_DOMAIN_USER_SECRET_ARN | nc traffic-comparator 9220"
export MIGRATION_REPLAYER_COMMAND=/bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer $MIGRATION_DOMAIN_ENDPOINT --insecure --kafka-traffic-brokers $MIGRATION_KAFKA_BROKER_ENDPOINTS --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id default-logging-group --kafka-traffic-enable-msk-auth --aws-auth-header-user $MIGRATION_DOMAIN_USER_NAME --aws-auth-header-secret $MIGRATION_DOMAIN_USER_SECRET_ARN"
```

#### Setting up existing Copilot infrastructure
Expand All @@ -115,8 +113,6 @@ copilot env init --name dev

// Initialize services with their respective required name
copilot svc init --name traffic-replayer
copilot svc init --name traffic-comparator
copilot svc init --name traffic-comparator-jupyter
copilot svc init --name capture-proxy-es
copilot svc init --name migration-console

Expand All @@ -132,8 +128,6 @@ Currently, it seems that Copilot does not support deploying all services at once
copilot env deploy --name dev

// Deploy services to a deployed environment
copilot svc deploy --name traffic-comparator-jupyter --env dev
copilot svc deploy --name traffic-comparator --env dev
copilot svc deploy --name traffic-replayer --env dev
copilot svc deploy --name capture-proxy-es --env dev
copilot svc deploy --name migration-console --env dev
Expand Down Expand Up @@ -164,8 +158,6 @@ curl https://$MIGRATION_DOMAIN_ENDPOINT:443/_cat/indices?v --insecure -u admin:A

A command shell can be opened in the service's container if that service has enabled `exec: true` in their `manifest.yml` and the SSM Session Manager plugin is installed when prompted.
```
copilot svc exec -a migration-copilot -e dev -n traffic-comparator-jupyter -c "bash"
copilot svc exec -a migration-copilot -e dev -n traffic-comparator -c "bash"
copilot svc exec -a migration-copilot -e dev -n traffic-replayer -c "bash"
copilot svc exec -a migration-copilot -e dev -n elasticsearch -c "bash"
copilot svc exec -a migration-copilot -e dev -n capture-proxy -c "bash"
Expand Down
Loading
Loading