Skip to content

Commit

Permalink
Add indexAllowlist to snapshot create command (#1009)
Browse files Browse the repository at this point in the history
Signed-off-by: Mikayla Thompson <[email protected]>
  • Loading branch information
mikaylathompson authored Sep 30, 2024
1 parent 10fa760 commit 7ba296e
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 25 deletions.
10 changes: 8 additions & 2 deletions CreateSnapshot/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ plugins {
id 'application'
id 'java'
id 'io.freefair.lombok' version '8.6'
id 'org.opensearch.migrations.java-application-conventions'
}

import org.opensearch.migrations.common.CommonUtils

java.sourceCompatibility = JavaVersion.VERSION_11
java.targetCompatibility = JavaVersion.VERSION_11

Expand All @@ -18,6 +17,13 @@ dependencies {
implementation group: 'org.jcommander', name: 'jcommander'
implementation group: 'org.slf4j', name: 'slf4j-api'
implementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j2-impl'

testImplementation testFixtures(project(":RFS"))
testImplementation testFixtures(project(":coreUtilities"))
testImplementation testFixtures(project(":testHelperFixtures"))
testImplementation group: 'org.testcontainers', name: 'junit-jupiter'
testImplementation group: 'io.netty', name: 'netty-all'
testImplementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
}

application {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.opensearch.migrations;

import java.util.List;

import org.opensearch.migrations.bulkload.common.FileSystemSnapshotCreator;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.S3SnapshotCreator;
Expand All @@ -26,41 +28,67 @@
@Slf4j
public class CreateSnapshot {
public static class Args {
@Parameter(names = {"--help", "-h"}, help = true, description = "Displays information about how to use this tool")
@Parameter(
names = {"--help", "-h"},
help = true,
description = "Displays information about how to use this tool")
private boolean help;

@Parameter(names = { "--snapshot-name" }, required = true, description = "The name of the snapshot to migrate")
@Parameter(
names = { "--snapshot-name" },
required = true,
description = "The name of the snapshot to migrate")
public String snapshotName;

@Parameter(names = {
"--file-system-repo-path" }, required = false, description = "The full path to the snapshot repo on the file system.")
@Parameter(
names = {"--file-system-repo-path" },
required = false,
description = "The full path to the snapshot repo on the file system.")
public String fileSystemRepoPath;

@Parameter(names = {
"--s3-repo-uri" }, required = false, description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2")
@Parameter(
names = {"--s3-repo-uri" },
required = false,
description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2")
public String s3RepoUri;

@Parameter(names = {
"--s3-region" }, required = false, description = "The AWS Region the S3 bucket is in, like: us-east-2")
@Parameter(
names = {"--s3-region" },
required = false,
description = "The AWS Region the S3 bucket is in, like: us-east-2")
public String s3Region;

@ParametersDelegate
public ConnectionContext.SourceArgs sourceArgs = new ConnectionContext.SourceArgs();

@Parameter(names = {
"--no-wait" }, description = "Optional. If provided, the snapshot runner will not wait for completion")
@Parameter(
names = {"--no-wait" },
description = "Optional. If provided, the snapshot runner will not wait for completion")
public boolean noWait = false;

@Parameter(names = {
"--max-snapshot-rate-mb-per-node" }, required = false, description = "The maximum snapshot rate in megabytes per second per node")
@Parameter(
names = {"--max-snapshot-rate-mb-per-node" },
required = false,
description = "The maximum snapshot rate in megabytes per second per node")
public Integer maxSnapshotRateMBPerNode;

@Parameter(names = {
"--s3-role-arn" }, required = false, description = "The role ARN the cluster will assume to write a snapshot to S3")
@Parameter(
names = {"--s3-role-arn" },
required = false,
description = "The role ARN the cluster will assume to write a snapshot to S3")
public String s3RoleArn;

@Parameter(required = false, names = {
"--otel-collector-endpoint" }, arity = 1, description = "Endpoint (host:port) for the OpenTelemetry Collector to which metrics logs should be"
@Parameter(
names = {"--index-allowlist"},
required = false,
description = "A comma separated list of indices to include in the snapshot. If not provided, all indices are included.")
public List<String> indexAllowlist = List.of();

@Parameter(
required = false,
names = {"--otel-collector-endpoint" },
arity = 1,
description = "Endpoint (host:port) for the OpenTelemetry Collector to which metrics logs should be"
+ "forwarded. If no value is provided, metrics will not be forwarded.")
String otelCollectorEndpoint;
}
Expand Down Expand Up @@ -114,6 +142,7 @@ public void run() {
arguments.snapshotName,
client,
arguments.fileSystemRepoPath,
arguments.indexAllowlist,
context
);
} else {
Expand All @@ -122,6 +151,7 @@ public void run() {
client,
arguments.s3RepoUri,
arguments.s3Region,
arguments.indexAllowlist,
arguments.maxSnapshotRateMBPerNode,
arguments.s3RoleArn,
context
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package org.opensearch.migrations;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.S3SnapshotCreator;
import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams;
import org.opensearch.migrations.bulkload.worker.SnapshotRunner;
import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext;
import org.opensearch.migrations.testutils.SimpleHttpResponse;
import org.opensearch.migrations.testutils.SimpleNettyHttpServer;

import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpMethod;
import lombok.extern.slf4j.Slf4j;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;

@Slf4j
public class TestCreateSnapshot {
final SnapshotTestContext snapshotContext = SnapshotTestContext.factory().noOtelTracking();
final byte[] payloadBytes = "Success".getBytes(StandardCharsets.UTF_8);
final Map<String, String> headers = Map.of(
"Content-Type",
"text/plain",
"Content-Length",
"" + payloadBytes.length
);

@Test
public void testRepoRegisterAndSnapshotCreateRequests() throws Exception {
var snapshotName = "my_snap";

ArrayList<Map.Entry<FullHttpRequest, String>> capturedRequestList = new ArrayList<>();
try (var destinationServer = SimpleNettyHttpServer.makeNettyServer(false,
Duration.ofMinutes(10),
fl -> {
capturedRequestList.add(new AbstractMap.SimpleEntry<>(fl, fl.content().toString(StandardCharsets.UTF_8)));
return new SimpleHttpResponse(headers, payloadBytes, "OK", 200);
}))
{
final var endpoint = destinationServer.localhostEndpoint().toString();

var sourceClient = new OpenSearchClient(ConnectionContextTestParams.builder()
.host(endpoint)
.insecure(true)
.build()
.toConnectionContext());
var snapshotCreator = new S3SnapshotCreator(
snapshotName,
sourceClient,
"s3://new-bucket/path-to-repo",
"us-east-2",
List.of(),
snapshotContext.createSnapshotCreateContext()
);
SnapshotRunner.run(snapshotCreator);

FullHttpRequest registerRepoRequest = capturedRequestList.get(0).getKey();
String registerRepoRequestContent = capturedRequestList.get(0).getValue();
FullHttpRequest createSnapshotRequest = capturedRequestList.get(1).getKey();
String createSnapshotRequestContent = capturedRequestList.get(1).getValue();

Assertions.assertEquals("/_snapshot/migration_assistant_repo", registerRepoRequest.uri());
Assertions.assertEquals(HttpMethod.PUT, registerRepoRequest.method());

ObjectMapper objectMapper = new ObjectMapper();

// Parse both JSON strings into JsonNode objects
JsonNode actualRegisterRepoRequest = objectMapper.readTree(registerRepoRequestContent);
JsonNode expectedRegisterRepoRequest = objectMapper
.createObjectNode()
.put("type", "s3")
.set("settings", objectMapper.createObjectNode()
.put("bucket", "new-bucket")
.put("region", "us-east-2")
.put("base_path", "path-to-repo"));

Assertions.assertEquals(expectedRegisterRepoRequest, actualRegisterRepoRequest);

JsonNode actualCreateSnapshotRequest = objectMapper.readTree(createSnapshotRequestContent);
JsonNode expectedCreateSnapshotRequest = objectMapper.createObjectNode()
.put("indices", "_all")
.put("ignore_unavailable", true)
.put("include_global_state", true);

Assertions.assertEquals("/_snapshot/migration_assistant_repo/" + snapshotName, createSnapshotRequest.uri());
Assertions.assertEquals(HttpMethod.PUT, createSnapshotRequest.method());
Assertions.assertEquals(expectedCreateSnapshotRequest, actualCreateSnapshotRequest);
}
}

@Test
public void testSnapshotCreateWithIndexAllowlist() throws Exception {
var snapshotName = "my_snap";
var indexAllowlist = List.of("allowed_index_1", "allowed_index_2");

final AtomicReference<FullHttpRequest> createSnapshotRequest = new AtomicReference<>();
final AtomicReference<String> createSnapshotRequestContent = new AtomicReference<>();
try (var destinationServer = SimpleNettyHttpServer.makeNettyServer(false,
Duration.ofMinutes(10),
fl -> {
if (fl.uri().equals("/_snapshot/migration_assistant_repo/" + snapshotName)) {
createSnapshotRequest.set(fl);
createSnapshotRequestContent.set(fl.content().toString(StandardCharsets.UTF_8));
}
return new SimpleHttpResponse(headers, payloadBytes, "OK", 200);
}))
{
final var endpoint = destinationServer.localhostEndpoint().toString();

var sourceClient = new OpenSearchClient(ConnectionContextTestParams.builder()
.host(endpoint)
.insecure(true)
.build()
.toConnectionContext());
var snapshotCreator = new S3SnapshotCreator(
snapshotName,
sourceClient,
"s3://new-bucket",
"us-east-2",
indexAllowlist,
snapshotContext.createSnapshotCreateContext()
);
SnapshotRunner.run(snapshotCreator);

Assertions.assertEquals(HttpMethod.PUT, createSnapshotRequest.get().method());
assertThat(createSnapshotRequestContent.get(), containsString(String.join(",", indexAllowlist)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ private void migrateFrom_ES(
snapshotName,
sourceClient,
SearchClusterContainer.CLUSTER_SNAPSHOT_DIR,
List.of(),
snapshotContext.createSnapshotCreateContext()
);
SnapshotRunner.runAndWaitForCompletion(snapshotCreator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ private void migrateFrom_ES(
snapshotName,
sourceClient,
SearchClusterContainer.CLUSTER_SNAPSHOT_DIR,
List.of(),
snapshotContext.createSnapshotCreateContext()
);
SnapshotRunner.runAndWaitForCompletion(snapshotCreator);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.opensearch.migrations.bulkload.common;

import java.util.List;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

Expand All @@ -14,9 +16,10 @@ public FileSystemSnapshotCreator(
String snapshotName,
OpenSearchClient client,
String snapshotRepoDirectoryPath,
List<String> indexAllowlist,
IRfsContexts.ICreateSnapshotContext context
) {
super(snapshotName, client, context);
super(snapshotName, indexAllowlist, client, context);
this.snapshotRepoDirectoryPath = snapshotRepoDirectoryPath;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.opensearch.migrations.bulkload.common;

import java.util.List;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

Expand All @@ -18,21 +20,23 @@ public S3SnapshotCreator(
OpenSearchClient client,
String s3Uri,
String s3Region,
List<String> indexAllowlist,
IRfsContexts.ICreateSnapshotContext context
) {
this(snapshotName, client, s3Uri, s3Region, null, null, context);
this(snapshotName, client, s3Uri, s3Region, indexAllowlist, null, null, context);
}

public S3SnapshotCreator(
String snapshotName,
OpenSearchClient client,
String s3Uri,
String s3Region,
List<String> indexAllowlist,
Integer maxSnapshotRateMBPerNode,
String snapshotRoleArn,
IRfsContexts.ICreateSnapshotContext context
) {
super(snapshotName, client, context);
super(snapshotName, indexAllowlist, client, context);
this.s3Uri = s3Uri;
this.s3Region = s3Region;
this.maxSnapshotRateMBPerNode = maxSnapshotRateMBPerNode;
Expand Down
Loading

0 comments on commit 7ba296e

Please sign in to comment.