Skip to content

Commit

Permalink
Merge branch 'main' into TrackableCompletableFutures
Browse files Browse the repository at this point in the history
* main: (57 commits)
  [Index configuration tool] Add support for insecure HTTPS endpoint (opensearch-project#218)
  Refactor inner classes from ExpiringTrafficStreamMap into separate classes. One other refactorings to simplify data encapsulation between the main map itself and each ExpiringKeyQueue.
  Move ExpiringTrafficStreamMap file to a new package to accommodate refactoring in the next commit.
  Make getOrCreateNodeMap() less pessimistic and add a TODO for work that needs to be done to remove (or at least describe) an obscure and rare race condition.
  [index configuration tool] Change to parsing Data Prepper pipeline YAML as input (opensearch-project#215)
  changing argument and variable names + updating description
  run on any .py changes within the top cluster_migration_core directory
  run lint on all .py changes
  Removing accidentally included kafka.properties file
  Migrations-1150 - Enhance Setting Kafka Properties for Kafka Puller
  Disabling python related workflows for non-python directories
  Using wildcards to add coverage report files instead of adding individual files
  removing finalized
  renaming workflow + no longer double executing tests
  Introduce Copilot Deployments (opensearch-project#201)
  Wildcard isn't supported - add each cov file manually
  run gradle build instead of assemble
  Update codecov version
  Add github workflow
  Bump aws-cdk-lib in /deployment/cdk/opensearch-service-migration (opensearch-project#206)
  ...

Signed-off-by: Greg Schohn <[email protected]>

# Conflicts:
#	TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/Accumulation.java
#	TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ExpiringTrafficStreamMap.java
#	TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator
  • Loading branch information
gregschohn committed Jun 28, 2023
2 parents 8c2e006 + a42a702 commit b0f0255
Show file tree
Hide file tree
Showing 56 changed files with 2,006 additions and 876 deletions.
8 changes: 7 additions & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
name: CI

on: [push, pull_request]
on:
push:
paths:
- '**.py'
pull_request:
paths:
- '**.py'

jobs:
lint:
Expand Down
31 changes: 31 additions & 0 deletions .github/workflows/gradle-build-and-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
name: Gradle Build and Test

on: [push, pull_request]

jobs:
gradle-build-and-test:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Set up JDK 11
uses: actions/setup-java@v2
with:
java-version: '11'
distribution: 'adopt'

- name: Run Gradle Build
run: ./gradlew build -x test
working-directory: TrafficCapture

- name: Run Tests with Coverage
run: ./gradlew test jacocoTestReport --info
working-directory: TrafficCapture

- name: Upload to Codecov
uses: codecov/codecov-action@v3
with:
files: "TrafficCapture/**/jacocoTestReport.xml"
flags: unittests
fail_ci_if_error: false
8 changes: 7 additions & 1 deletion .github/workflows/python-tests.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
name: python-tests

on: [push, pull_request]
on:
push:
paths:
- 'cluster_migration_core/**.py'
pull_request:
paths:
- 'cluster_migration_core/**.py'

jobs:
test-linux:
Expand Down
51 changes: 50 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,60 @@ This repo will contain code and documentation to assist in migrations and upgrad

Developers must run the "install_githooks.sh" script in order to add the pre-commit hook.

## Docker Solution

The TrafficCapture directory hosts a set of projects designed to facilitate the proxying and capturing of HTTP
traffic, which can then be offloaded and replayed to other HTTP server(s).

More documentation on this solution can be found here:
[TrafficCapture README](TrafficCapture/README.md)

### End-to-End Testing

Developers can run a test script which will verify the end-to-end Docker Solution.
#### Pre-requisites

* Have all containers from Docker solution running.

To run the test script, users must navigate to the [test directory](test/),
install the required packages then run the script:

```
cd test
pip install -r requirements.txt
pytest tests.py
```

#### Notes
##### Ports Setup
The test script, by default, uses the ports assigned to the containers in this
[docker-compose file](TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml), so if the Docker solution in
it's current setup started with no issues, then the test script will run as is. If for any reason
the user changed the ports in that file, they must also either, change the following environment variables:
`PROXY_ENDPOINT`, `SOURCE_ENDPOINT`, `TARGET_ENDPOINT` and `JUPYTER_NOTEBOOK` respectively, or update the default value
(which can be found below) for them in [tests.py](test/tests.py).

The following are the default values for the only endpoints touched by this script:
* `PROXY_ENDPOINT = https://localhost:9200`
* `SOURCE_ENDPOINT = http://localhost:19200`
* `TARGET_ENDPOINT = https://localhost:29200`
* `JUPYTER_NOTEBOOK = http://localhost:8888/api`
#### Clean Up
The test script is implemented with a setup and teardown functions that are ran after
each and every test where additions made to the endpoints are deleted, *mostly* cleaning up after themselves, however,
as we log all operations going through the proxy (which is capturing the traffic), those are only being
deleted after the Docker solution is shut down.

## Deploying to AWS with Copilot

The containerized services that this repo uses can be deployed to AWS with the use of [Copilot](https://aws.github.io/copilot-cli/)

Documentation for getting started and deploying these services can be found [here](deployment/copilot/README.md)

## Security

See [CONTRIBUTING](CONTRIBUTING.md#security-issue-notifications) for more information.

## License

This project is licensed under the Apache-2.0 License.

28 changes: 28 additions & 0 deletions TrafficCapture/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
## Running the Docker Solution

While in the TrafficCapture directory, run the following command:

`./gradlew :dockerSolution:composeUp`

## Compatibility

Must have Java version 11 installed.

The tools in this directory can only be built if you have Java version 11 installed.

The version is specified in `TrafficCapture/build.gradle` using a Java toolchain, which allows us
to decouple the Java version used by Gradle itself from Java version used by the tools here.

Any attempt to use a different version will cause the build to fail and will result in the following error (or similar)
depending on which tool/project is being built. The below example shows the error printed when running e.g `./gradlew
trafficCaptureProxyServer:build`

```
* What went wrong:
A problem occurred evaluating project ':trafficCaptureProxyServer'.
> Could not resolve all dependencies for configuration ':trafficCaptureProxyServer:opensearchSecurityPlugin'.
> Failed to calculate the value of task ':trafficCaptureProxyServer:compileJava' property 'javaCompiler'.
> No matching toolchains found for requested specification: {languageVersion=10, vendor=any, implementation=vendor-specific}.
> No locally installed toolchains match (see https://docs.gradle.org/8.0.2/userguide/toolchains.html#sec:auto_detection) and toolchain download repositories have not been configured (see https://docs.gradle.org/8.0.2/userguide/toolchains.html#sub:download_repositories).
```
8 changes: 8 additions & 0 deletions TrafficCapture/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
allprojects {
apply plugin: 'java'
apply plugin: 'jacoco'

java {
toolchain {
Expand All @@ -9,4 +10,11 @@ allprojects {
test {
jvmArgs '-ea'
}

jacocoTestReport {
reports {
xml.required = true
xml.destination file("${buildDir}/reports/jacoco/test/jacocoTestReport.xml")
}
}
}
7 changes: 6 additions & 1 deletion TrafficCapture/dockerSolution/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ trafficComparatorServices.forEach {projectName, dockerImageName ->
dependsOn(tasks.getByName('cloneComparatorRepoIfNeeded'))
from REALIZED_TRAFFIC_COMPARATOR_DIRECTORY
into dockerBuildDir
include 'setup.py'
include '*.py'
include '/traffic_comparator/*'
if (projectName == 'jupyterNotebook') {
include '*.ipynb'
}
}

task "createDockerfile_${projectName}"(type: com.bmuschko.gradle.docker.tasks.image.Dockerfile) {
Expand All @@ -85,6 +89,7 @@ trafficComparatorServices.forEach {projectName, dockerImageName ->
from 'python:3.10.10'
runCommand("apt-get update && apt-get install -y netcat lsof")
copyFile("setup.py", "/setup.py")
copyFile(".", "/containerTC/")
runCommand("pip3 install --editable \".[data]\"")
// container stay-alive
defaultCommand('tail', '-f', '/dev/null')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ services:
- migrations
ports:
- "9200:9200"
command: /runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.proxyserver.Main --kafkaConnection kafka:9092 --destinationHost elasticsearch --destinationPort 9200 --listenPort 9200 --sslConfigFile /usr/share/elasticsearch/config/proxy_tls.yml
command: /runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.proxyserver.Main --kafkaConnection kafka:9092 --destinationUri http://elasticsearch:9200 --listenPort 9200 --sslConfigFile /usr/share/elasticsearch/config/proxy_tls.yml
depends_on:
- kafka
- elasticsearch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

import java.io.IOException;
import java.io.OutputStream;
import java.io.InputStream;
import java.io.FileInputStream;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
Expand All @@ -36,6 +38,15 @@ static class Parameters {
names = {"-g", "--group-id"},
description = "Client id that should be used when communicating with the Kafka broker.")
String clientGroupId;
@Parameter(required = false,
names = {"--enableMSKAuth"},
description = "Enables SASL properties required for connecting to MSK with IAM auth.")
boolean mskAuthEnabled = false;
@Parameter(required = false,
names = {"--kafkaConfigFile"},
arity = 1,
description = "Kafka properties file")
String kafkaPropertiesFile;
}

public static Parameters parseArgs(String[] args) {
Expand Down Expand Up @@ -65,16 +76,28 @@ public static void main(String[] args) {
String topic = params.topicName;

Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
if (params.kafkaPropertiesFile != null) {
try (InputStream input = new FileInputStream(params.kafkaPropertiesFile)) {
properties.load(input);
} catch (IOException ex) {
log.error("Unable to load properties from kafka.properties file.");
return;
}
}
else {
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
// Required for using SASL auth with MSK public endpoint
//properties.setProperty("security.protocol", "SASL_SSL");
//properties.setProperty("sasl.mechanism", "AWS_MSK_IAM");
//properties.setProperty("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;");
//properties.setProperty("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler");
if (params.mskAuthEnabled){
properties.setProperty("security.protocol", "SASL_SSL");
properties.setProperty("sasl.mechanism", "AWS_MSK_IAM");
properties.setProperty("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;");
properties.setProperty("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler");
}

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(properties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.google.protobuf.CodedOutputStream;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.logging.log4j.core.util.NullOutputStream;
import org.opensearch.common.settings.Settings;
import org.opensearch.migrations.trafficcapture.FileConnectionCaptureFactory;
import org.opensearch.migrations.trafficcapture.IChannelConnectionCaptureSerializer;
import org.opensearch.migrations.trafficcapture.IConnectionCaptureFactory;
import org.opensearch.migrations.trafficcapture.StreamChannelConnectionCaptureSerializer;
import org.opensearch.migrations.trafficcapture.kafkaoffloader.KafkaCaptureFactory;
Expand All @@ -20,9 +22,10 @@
import org.opensearch.security.ssl.util.SSLConfigConstants;

import javax.net.ssl.SSLEngine;
import java.io.File;
import javax.net.ssl.SSLException;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Optional;
Expand Down Expand Up @@ -74,15 +77,15 @@ static class Parameters {
description = "The maximum number of bytes that will be written to a single TrafficStream object.")
int maximumTrafficStreamSize = 1024*1024;
@Parameter(required = false,
names = {"--destinationHost"},
arity = 1,
description = "Hostname of the server that the proxy is capturing traffic for.")
String backsideHostname = "localhost";
names = {"--insecureDestination"},
arity = 0,
description = "Do not check the destination server's certificate")
boolean allowInsecureConnectionsToBackside;
@Parameter(required = true,
names = {"--destinationPort"},
names = {"--destinationUri"},
arity = 1,
description = "Port of the server that the proxy connects to.")
int backsidePort = 0;
description = "URI of the server that the proxy is capturing traffic for.")
String backsideUriString;
@Parameter(required = true,
names = {"--listenPort"},
arity = 1,
Expand Down Expand Up @@ -179,11 +182,48 @@ private static IConnectionCaptureFactory getConnectionCaptureFactory(Parameters
}
}

// Utility method for converting uri string to an actual URI object. Similar logic is placed in the trafficReplayer
// module: TrafficReplayer.java
private static URI convertStringToUri(String uriString) {
URI serverUri;
try {
serverUri = new URI(uriString);
} catch (Exception e) {
System.err.println("Exception parsing URI string: " + uriString);
System.err.println(e.getMessage());
System.exit(3);
return null;
}
if (serverUri.getPort() < 0) {
throw new RuntimeException("Port not present for URI: " + serverUri);
}
if (serverUri.getHost() == null) {
throw new RuntimeException("Hostname not present for URI: " + serverUri);
}
if (serverUri.getScheme() == null) {
throw new RuntimeException("Scheme (http|https) is not present for URI: " + serverUri);
}
return serverUri;
}

private static SslContext loadBacksideSslContext(URI serverUri, boolean allowInsecureConnections) throws
SSLException {
if (serverUri.getScheme().equalsIgnoreCase("https")) {
var sslContextBuilder = SslContextBuilder.forClient();
if (allowInsecureConnections) {
sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
}
return sslContextBuilder.build();
} else {
return null;
}
}

public static void main(String[] args) throws InterruptedException, IOException {

var params = parseArgs(args);
var backsideUri = convertStringToUri(params.backsideUriString);

// This should be added to the argument parser when added in
var sksOp = Optional.ofNullable(params.sslConfigFilePath)
.map(sslConfigFile->new DefaultSecurityKeyStore(getSettings(sslConfigFile),
Paths.get(sslConfigFile).toAbsolutePath().getParent()));
Expand All @@ -192,7 +232,7 @@ public static void main(String[] args) throws InterruptedException, IOException
var proxy = new NettyScanningHttpProxy(params.frontsidePort);

try {
proxy.start(params.backsideHostname, params.backsidePort,
proxy.start(backsideUri, loadBacksideSslContext(backsideUri, params.allowInsecureConnectionsToBackside),
sksOp.map(sks-> (Supplier<SSLEngine>) () -> {
try {
var sslEngine = sks.createHTTPSSLEngine();
Expand Down
Loading

0 comments on commit b0f0255

Please sign in to comment.