diff --git a/CreateSnapshot/build.gradle b/CreateSnapshot/build.gradle index 14a25540c..3ac514578 100644 --- a/CreateSnapshot/build.gradle +++ b/CreateSnapshot/build.gradle @@ -2,10 +2,9 @@ plugins { id 'application' id 'java' id 'io.freefair.lombok' version '8.6' + id 'org.opensearch.migrations.java-application-conventions' } -import org.opensearch.migrations.common.CommonUtils - java.sourceCompatibility = JavaVersion.VERSION_11 java.targetCompatibility = JavaVersion.VERSION_11 @@ -18,6 +17,13 @@ dependencies { implementation group: 'org.jcommander', name: 'jcommander' implementation group: 'org.slf4j', name: 'slf4j-api' implementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j2-impl' + + testImplementation testFixtures(project(":RFS")) + testImplementation testFixtures(project(":coreUtilities")) + testImplementation testFixtures(project(":testHelperFixtures")) + testImplementation group: 'org.testcontainers', name: 'junit-jupiter' + testImplementation group: 'io.netty', name: 'netty-all' + testImplementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind' } application { diff --git a/CreateSnapshot/src/main/java/org/opensearch/migrations/CreateSnapshot.java b/CreateSnapshot/src/main/java/org/opensearch/migrations/CreateSnapshot.java index 98ddfcc3d..f0cd1c589 100644 --- a/CreateSnapshot/src/main/java/org/opensearch/migrations/CreateSnapshot.java +++ b/CreateSnapshot/src/main/java/org/opensearch/migrations/CreateSnapshot.java @@ -1,5 +1,7 @@ package org.opensearch.migrations; +import java.util.List; + import org.opensearch.migrations.bulkload.common.FileSystemSnapshotCreator; import org.opensearch.migrations.bulkload.common.OpenSearchClient; import org.opensearch.migrations.bulkload.common.S3SnapshotCreator; @@ -26,41 +28,67 @@ @Slf4j public class CreateSnapshot { public static class Args { - @Parameter(names = {"--help", "-h"}, help = true, description = "Displays information about how to use this tool") + @Parameter( + names = {"--help", "-h"}, + help = true, + description = "Displays information about how to use this tool") private boolean help; - @Parameter(names = { "--snapshot-name" }, required = true, description = "The name of the snapshot to migrate") + @Parameter( + names = { "--snapshot-name" }, + required = true, + description = "The name of the snapshot to migrate") public String snapshotName; - @Parameter(names = { - "--file-system-repo-path" }, required = false, description = "The full path to the snapshot repo on the file system.") + @Parameter( + names = {"--file-system-repo-path" }, + required = false, + description = "The full path to the snapshot repo on the file system.") public String fileSystemRepoPath; - @Parameter(names = { - "--s3-repo-uri" }, required = false, description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2") + @Parameter( + names = {"--s3-repo-uri" }, + required = false, + description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2") public String s3RepoUri; - @Parameter(names = { - "--s3-region" }, required = false, description = "The AWS Region the S3 bucket is in, like: us-east-2") + @Parameter( + names = {"--s3-region" }, + required = false, + description = "The AWS Region the S3 bucket is in, like: us-east-2") public String s3Region; @ParametersDelegate public ConnectionContext.SourceArgs sourceArgs = new ConnectionContext.SourceArgs(); - @Parameter(names = { - "--no-wait" }, description = "Optional. If provided, the snapshot runner will not wait for completion") + @Parameter( + names = {"--no-wait" }, + description = "Optional. If provided, the snapshot runner will not wait for completion") public boolean noWait = false; - @Parameter(names = { - "--max-snapshot-rate-mb-per-node" }, required = false, description = "The maximum snapshot rate in megabytes per second per node") + @Parameter( + names = {"--max-snapshot-rate-mb-per-node" }, + required = false, + description = "The maximum snapshot rate in megabytes per second per node") public Integer maxSnapshotRateMBPerNode; - @Parameter(names = { - "--s3-role-arn" }, required = false, description = "The role ARN the cluster will assume to write a snapshot to S3") + @Parameter( + names = {"--s3-role-arn" }, + required = false, + description = "The role ARN the cluster will assume to write a snapshot to S3") public String s3RoleArn; - @Parameter(required = false, names = { - "--otel-collector-endpoint" }, arity = 1, description = "Endpoint (host:port) for the OpenTelemetry Collector to which metrics logs should be" + @Parameter( + names = {"--index-allowlist"}, + required = false, + description = "A comma separated list of indices to include in the snapshot. If not provided, all indices are included.") + public List indexAllowlist = List.of(); + + @Parameter( + required = false, + names = {"--otel-collector-endpoint" }, + arity = 1, + description = "Endpoint (host:port) for the OpenTelemetry Collector to which metrics logs should be" + "forwarded. If no value is provided, metrics will not be forwarded.") String otelCollectorEndpoint; } @@ -114,6 +142,7 @@ public void run() { arguments.snapshotName, client, arguments.fileSystemRepoPath, + arguments.indexAllowlist, context ); } else { @@ -122,6 +151,7 @@ public void run() { client, arguments.s3RepoUri, arguments.s3Region, + arguments.indexAllowlist, arguments.maxSnapshotRateMBPerNode, arguments.s3RoleArn, context diff --git a/CreateSnapshot/src/test/java/org/opensearch/migrations/TestCreateSnapshot.java b/CreateSnapshot/src/test/java/org/opensearch/migrations/TestCreateSnapshot.java new file mode 100644 index 000000000..88fdb65b8 --- /dev/null +++ b/CreateSnapshot/src/test/java/org/opensearch/migrations/TestCreateSnapshot.java @@ -0,0 +1,143 @@ +package org.opensearch.migrations; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.bulkload.common.S3SnapshotCreator; +import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams; +import org.opensearch.migrations.bulkload.worker.SnapshotRunner; +import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext; +import org.opensearch.migrations.testutils.SimpleHttpResponse; +import org.opensearch.migrations.testutils.SimpleNettyHttpServer; + +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpMethod; +import lombok.extern.slf4j.Slf4j; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.MatcherAssert.assertThat; + +@Slf4j +public class TestCreateSnapshot { + final SnapshotTestContext snapshotContext = SnapshotTestContext.factory().noOtelTracking(); + final byte[] payloadBytes = "Success".getBytes(StandardCharsets.UTF_8); + final Map headers = Map.of( + "Content-Type", + "text/plain", + "Content-Length", + "" + payloadBytes.length + ); + + @Test + public void testRepoRegisterAndSnapshotCreateRequests() throws Exception { + var snapshotName = "my_snap"; + + ArrayList> capturedRequestList = new ArrayList<>(); + try (var destinationServer = SimpleNettyHttpServer.makeNettyServer(false, + Duration.ofMinutes(10), + fl -> { + capturedRequestList.add(new AbstractMap.SimpleEntry<>(fl, fl.content().toString(StandardCharsets.UTF_8))); + return new SimpleHttpResponse(headers, payloadBytes, "OK", 200); + })) + { + final var endpoint = destinationServer.localhostEndpoint().toString(); + + var sourceClient = new OpenSearchClient(ConnectionContextTestParams.builder() + .host(endpoint) + .insecure(true) + .build() + .toConnectionContext()); + var snapshotCreator = new S3SnapshotCreator( + snapshotName, + sourceClient, + "s3://new-bucket/path-to-repo", + "us-east-2", + List.of(), + snapshotContext.createSnapshotCreateContext() + ); + SnapshotRunner.run(snapshotCreator); + + FullHttpRequest registerRepoRequest = capturedRequestList.get(0).getKey(); + String registerRepoRequestContent = capturedRequestList.get(0).getValue(); + FullHttpRequest createSnapshotRequest = capturedRequestList.get(1).getKey(); + String createSnapshotRequestContent = capturedRequestList.get(1).getValue(); + + Assertions.assertEquals("/_snapshot/migration_assistant_repo", registerRepoRequest.uri()); + Assertions.assertEquals(HttpMethod.PUT, registerRepoRequest.method()); + + ObjectMapper objectMapper = new ObjectMapper(); + + // Parse both JSON strings into JsonNode objects + JsonNode actualRegisterRepoRequest = objectMapper.readTree(registerRepoRequestContent); + JsonNode expectedRegisterRepoRequest = objectMapper + .createObjectNode() + .put("type", "s3") + .set("settings", objectMapper.createObjectNode() + .put("bucket", "new-bucket") + .put("region", "us-east-2") + .put("base_path", "path-to-repo")); + + Assertions.assertEquals(expectedRegisterRepoRequest, actualRegisterRepoRequest); + + JsonNode actualCreateSnapshotRequest = objectMapper.readTree(createSnapshotRequestContent); + JsonNode expectedCreateSnapshotRequest = objectMapper.createObjectNode() + .put("indices", "_all") + .put("ignore_unavailable", true) + .put("include_global_state", true); + + Assertions.assertEquals("/_snapshot/migration_assistant_repo/" + snapshotName, createSnapshotRequest.uri()); + Assertions.assertEquals(HttpMethod.PUT, createSnapshotRequest.method()); + Assertions.assertEquals(expectedCreateSnapshotRequest, actualCreateSnapshotRequest); + } + } + + @Test + public void testSnapshotCreateWithIndexAllowlist() throws Exception { + var snapshotName = "my_snap"; + var indexAllowlist = List.of("allowed_index_1", "allowed_index_2"); + + final AtomicReference createSnapshotRequest = new AtomicReference<>(); + final AtomicReference createSnapshotRequestContent = new AtomicReference<>(); + try (var destinationServer = SimpleNettyHttpServer.makeNettyServer(false, + Duration.ofMinutes(10), + fl -> { + if (fl.uri().equals("/_snapshot/migration_assistant_repo/" + snapshotName)) { + createSnapshotRequest.set(fl); + createSnapshotRequestContent.set(fl.content().toString(StandardCharsets.UTF_8)); + } + return new SimpleHttpResponse(headers, payloadBytes, "OK", 200); + })) + { + final var endpoint = destinationServer.localhostEndpoint().toString(); + + var sourceClient = new OpenSearchClient(ConnectionContextTestParams.builder() + .host(endpoint) + .insecure(true) + .build() + .toConnectionContext()); + var snapshotCreator = new S3SnapshotCreator( + snapshotName, + sourceClient, + "s3://new-bucket", + "us-east-2", + indexAllowlist, + snapshotContext.createSnapshotCreateContext() + ); + SnapshotRunner.run(snapshotCreator); + + Assertions.assertEquals(HttpMethod.PUT, createSnapshotRequest.get().method()); + assertThat(createSnapshotRequestContent.get(), containsString(String.join(",", indexAllowlist))); + } + } +} diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/EndToEndTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/EndToEndTest.java index 6b814e53a..77104bf8b 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/EndToEndTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/EndToEndTest.java @@ -107,6 +107,7 @@ private void migrateFrom_ES( snapshotName, sourceClient, SearchClusterContainer.CLUSTER_SNAPSHOT_DIR, + List.of(), snapshotContext.createSnapshotCreateContext() ); SnapshotRunner.runAndWaitForCompletion(snapshotCreator); diff --git a/MetadataMigration/src/test/java/org/opensearch/migrations/EndToEndTest.java b/MetadataMigration/src/test/java/org/opensearch/migrations/EndToEndTest.java index 876a26ae2..8a7f7248b 100644 --- a/MetadataMigration/src/test/java/org/opensearch/migrations/EndToEndTest.java +++ b/MetadataMigration/src/test/java/org/opensearch/migrations/EndToEndTest.java @@ -155,6 +155,7 @@ private void migrateFrom_ES( snapshotName, sourceClient, SearchClusterContainer.CLUSTER_SNAPSHOT_DIR, + List.of(), snapshotContext.createSnapshotCreateContext() ); SnapshotRunner.runAndWaitForCompletion(snapshotCreator); diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/FileSystemSnapshotCreator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/FileSystemSnapshotCreator.java index b877f0eb8..a0b8f1cb0 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/FileSystemSnapshotCreator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/FileSystemSnapshotCreator.java @@ -1,5 +1,7 @@ package org.opensearch.migrations.bulkload.common; +import java.util.List; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -14,9 +16,10 @@ public FileSystemSnapshotCreator( String snapshotName, OpenSearchClient client, String snapshotRepoDirectoryPath, + List indexAllowlist, IRfsContexts.ICreateSnapshotContext context ) { - super(snapshotName, client, context); + super(snapshotName, indexAllowlist, client, context); this.snapshotRepoDirectoryPath = snapshotRepoDirectoryPath; } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3SnapshotCreator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3SnapshotCreator.java index 04925760d..58c7b7111 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3SnapshotCreator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3SnapshotCreator.java @@ -1,5 +1,7 @@ package org.opensearch.migrations.bulkload.common; +import java.util.List; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -18,9 +20,10 @@ public S3SnapshotCreator( OpenSearchClient client, String s3Uri, String s3Region, + List indexAllowlist, IRfsContexts.ICreateSnapshotContext context ) { - this(snapshotName, client, s3Uri, s3Region, null, null, context); + this(snapshotName, client, s3Uri, s3Region, indexAllowlist, null, null, context); } public S3SnapshotCreator( @@ -28,11 +31,12 @@ public S3SnapshotCreator( OpenSearchClient client, String s3Uri, String s3Region, + List indexAllowlist, Integer maxSnapshotRateMBPerNode, String snapshotRoleArn, IRfsContexts.ICreateSnapshotContext context ) { - super(snapshotName, client, context); + super(snapshotName, indexAllowlist, client, context); this.s3Uri = s3Uri; this.s3Region = s3Region; this.maxSnapshotRateMBPerNode = maxSnapshotRateMBPerNode; diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java index 6e9d1becc..9d8e53af4 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java @@ -1,5 +1,6 @@ package org.opensearch.migrations.bulkload.common; +import java.util.List; import java.util.Optional; import com.fasterxml.jackson.databind.DeserializationFeature; @@ -23,9 +24,12 @@ public abstract class SnapshotCreator { private final IRfsContexts.ICreateSnapshotContext context; @Getter private final String snapshotName; + private final List indexAllowlist; - protected SnapshotCreator(String snapshotName, OpenSearchClient client, IRfsContexts.ICreateSnapshotContext context) { + protected SnapshotCreator(String snapshotName, List indexAllowlist, OpenSearchClient client, + IRfsContexts.ICreateSnapshotContext context) { this.snapshotName = snapshotName; + this.indexAllowlist = indexAllowlist; this.client = client; this.context = context; } @@ -36,6 +40,14 @@ public String getRepoName() { return "migration_assistant_repo"; } + public String getIndexAllowlist() { + if (this.indexAllowlist == null || this.indexAllowlist.isEmpty()) { + return "_all"; + } else { + return String.join(",", this.indexAllowlist); + } + } + public void registerRepo() { ObjectNode settings = getRequestBodyForRegisterRepo(); @@ -52,7 +64,7 @@ public void registerRepo() { public void createSnapshot() { // Assemble the settings ObjectNode body = mapper.createObjectNode(); - body.put("indices", "_all"); + body.put("indices", this.getIndexAllowlist()); body.put("ignore_unavailable", true); body.put("include_global_state", true); @@ -91,8 +103,8 @@ public boolean isSnapshotFinished() { return false; } else { log.atError().setMessage("Snapshot {} has failed with state {}") - .addArgument(snapshotName) - .addArgument(state).log(); + .addArgument(snapshotName) + .addArgument(state).log(); throw new SnapshotCreationFailed(snapshotName); } }