Skip to content

Commit

Permalink
migrateFrom_ES_v6_8 is working!
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Nied <[email protected]>
  • Loading branch information
peternied committed Jun 27, 2024
1 parent e45f1b3 commit c09409b
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ public static void removeIntermediateIndexSettingsLevel(ObjectNode root) {
}

/**
* If allocation awareness is enabled, we need to ensure that the number of copies of our data matches the dimensionality.
* As a specific example, if you spin up a cluster spread across 3 availability zones and your awareness attribute is "zone",
* then the dimensionality would be 3. This means you need to ensure the number of total copies is a multiple of 3, with
* the minimum number of replicas being 2.
* the minimum number of * If allocation awareness is enabled, we need to ensure that the number of copies of our data matches the dimensionality.
replicas being 2.
*/
public static void fixReplicasForDimensionality(ObjectNode root, int dimensionality) {
if (root.has("settings")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.rfs.models.GlobalMetadata;
import com.rfs.models.IndexMetadata;
import com.rfs.version_es_6_8.IndexMetadataData_ES_6_8;
import com.rfs.version_os_2_11.GlobalMetadataData_OS_2_11;

import org.opensearch.migrations.transformation.TransformationRule;
Expand All @@ -32,19 +33,16 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) {
ObjectNode newRoot = mapper.createObjectNode();

// Transform the original "templates", but put them into the legacy "templates" bucket on the target
if (root.get("templates") != null) {
ObjectNode templatesRoot = (ObjectNode) root.get("templates").deepCopy();
templatesRoot.fieldNames().forEachRemaining(templateName -> {
ObjectNode template = (ObjectNode) templatesRoot.get(templateName);
logger.info("Transforming template: " + templateName);
logger.debug("Original template: " + template.toString());
TransformFunctions.removeIntermediateMappingsLevels(template);
TransformFunctions.removeIntermediateIndexSettingsLevel(template); // run before fixNumberOfReplicas
TransformFunctions.fixReplicasForDimensionality(template, awarenessAttributeDimensionality);
logger.debug("Transformed template: " + template.toString());
templatesRoot.set(templateName, template);
var originalTemplates = root.get("templates");
if (originalTemplates != null) {
var templates = mapper.createObjectNode();
originalTemplates.fieldNames().forEachRemaining(templateName -> {
var templateCopy = (ObjectNode) originalTemplates.get(templateName).deepCopy();
var indexTemplate = (Index) () -> templateCopy;
tranformIndex(indexTemplate);
templates.set(templateName, indexTemplate.raw());
});
newRoot.set("templates", templatesRoot);
newRoot.set("templates", templates);
}

// Make empty index_templates
Expand All @@ -64,19 +62,21 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) {

@Override
public IndexMetadata transformIndexMetadata(IndexMetadata index) {
logger.debug("Original Object: " + index.raw().toString());
var copy = index.deepCopy();
var newRoot = copy.raw();
tranformIndex(copy);
return copy;
}

indexTransformations.forEach(transformer -> transformer.applyTransformation(index));
private void tranformIndex(Index index) {
logger.debug("Original Object: " + index.raw().toPrettyString());
var newRoot = index.raw();

TransformFunctions.removeIntermediateMappingsLevels(newRoot);
indexTransformations.forEach(transformer -> transformer.applyTransformation(index));

newRoot.set("settings", TransformFunctions.convertFlatSettingsToTree((ObjectNode) newRoot.get("settings")));
TransformFunctions.removeIntermediateIndexSettingsLevel(newRoot); // run before fixNumberOfReplicas
TransformFunctions.fixReplicasForDimensionality(newRoot, awarenessAttributeDimensionality);

logger.debug("Transformed Object: " + newRoot.toString());
return copy;
}
logger.debug("Transformed Object: " + newRoot.toPrettyString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public Optional<ObjectNode> create(IndexMetadata index, String indexName, String
// Assemble the request body
ObjectNode body = mapper.createObjectNode();
body.set("aliases", indexMetadata.getAliases());
body.set("mappings", indexMetadata.getMappings());
body.set("mappings", indexMetadata.raw().get("mappings"));
body.set("settings", settings);

// Create the index; it's fine if it already exists
Expand Down
45 changes: 45 additions & 0 deletions RFS/src/test/java/com/rfs/framework/ClusterOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import org.apache.hc.client5.http.classic.methods.HttpPut;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.ParseException;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.entity.StringEntity;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.equalTo;
Expand All @@ -25,6 +28,48 @@ public ClusterOperations(final String clusterUrl) {
httpClient = HttpClients.createDefault();
}

public void createTemplate(final String templateName) throws IOException {
final var templateJson =
"{\r\n" + //
" \"index_patterns\": [\r\n" + //
" \"te*\",\r\n" + //
" \"bar*\"\r\n" + //
" ],\r\n" + //
" \"settings\": {\r\n" + //
" \"number_of_shards\": 1\r\n" + //
" },\r\n" + //
" \"aliases\": {\r\n" + //
" \"alias1\": {}\r\n" + //
" },\r\n" + //
" \"mappings\": {\r\n" + //
" \"_doc\": {\r\n" + //
" \"_source\": {\r\n" + //
" \"enabled\": false\r\n" + //
" },\r\n" + //
" \"properties\": {\r\n" + //
" \"host_name\": {\r\n" + //
" \"type\": \"keyword\"\r\n" + //
" },\r\n" + //
" \"created_at\": {\r\n" + //
" \"type\": \"date\",\r\n" + //
" \"format\": \"EEE MMM dd HH:mm:ss Z yyyy\"\r\n" + //
" }\r\n" + //
" }\r\n" + //
" }\r\n" + //
" }\r\n" + //
"}";

final var createRepoRequest = new HttpPut(clusterUrl + "/_template/" + templateName);
createRepoRequest.setEntity(new StringEntity(templateJson));
createRepoRequest.setHeader("Content-Type", "application/json");

try (var response = httpClient.execute(createRepoRequest)) {
assertThat(EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8), response.getCode(), equalTo(200));
} catch (ParseException e) {
throw new RuntimeException(e);
}
}

public void createSnapshotRepository() throws IOException {
// Create snapshot repository
final var repositoryJson = "{\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ public default void fullMigrationViaLocalSnapshot(final String targetClusterUrl)

public void updateTargetCluster(final List<IndexMetadata> indices, final Path unpackedShardDataDir, final OpenSearchClient client) throws Exception;

}
}
67 changes: 64 additions & 3 deletions RFS/src/test/java/com/rfs/integration/EndToEndTest.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,38 @@
package com.rfs.integration;

import java.io.File;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;

import com.rfs.common.ClusterVersion;
import com.rfs.common.FileSystemRepo;
import com.rfs.common.FileSystemSnapshotCreator;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.SnapshotRepo;
import com.rfs.framework.ClusterOperations;
import com.rfs.framework.ElasticsearchContainer;
import com.rfs.framework.OpenSearchContainer;
import com.rfs.framework.OpenSearchContainer.Version;
import com.rfs.framework.SimpleRestoreFromSnapshot;
import com.rfs.models.GlobalMetadata;
import com.rfs.models.IndexMetadata;
import com.rfs.transformers.TransformFunctions;
import com.rfs.transformers.Transformer;
import com.rfs.version_es_6_8.SnapshotRepoProvider_ES_6_8;
import com.rfs.version_es_6_8.GlobalMetadataFactory_ES_6_8;
import com.rfs.version_es_6_8.IndexMetadataFactory_ES_6_8;
import com.rfs.version_es_6_8.SnapshotRepoProvider_ES_6_8;
import com.rfs.version_os_2_11.GlobalMetadataCreator_OS_2_11;
import com.rfs.version_os_2_11.IndexCreator_OS_2_11;
import com.rfs.worker.IndexRunner;
import com.rfs.worker.MetadataRunner;
import com.rfs.worker.SnapshotRunner;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -17,13 +42,13 @@
@Slf4j
public class EndToEndTest {

protected Object sourceCluster;
protected Object targetCluster;
@TempDir
private File localDirectory;

protected SimpleRestoreFromSnapshot simpleRfsInstance;

@ParameterizedTest(name = "Target OpenSearch {0}")
@ArgumentsSource(SupportedTargetCluster.class)
@Disabled
public void migrateFrom_ES_v6_8(final OpenSearchContainer.Version targetVersion) throws Exception {
// Setup
// PSEUDO: Create a source cluster running ES 6.8
Expand All @@ -42,6 +67,42 @@ public void migrateFrom_ES_v6_8(final OpenSearchContainer.Version targetVersion)
// - 29x data-rolling
// - 5x geonames docs into playground
// - 7x geopoint into playground2
try (final var sourceCluster = new ElasticsearchContainer(ElasticsearchContainer.V6_8_23);
final var targetCluster = new OpenSearchContainer(targetVersion)) {
// Start the cluster for testing
var bothClustersStarted = CompletableFuture.allOf(
CompletableFuture.runAsync(() -> sourceCluster.start()),
CompletableFuture.runAsync(() -> targetCluster.start()));
bothClustersStarted.join();

var sourceClusterOperations = new ClusterOperations(sourceCluster.getUrl());
sourceClusterOperations.createTemplate("my_template_foo");
sourceClusterOperations.createDocument("barstool", "222", "{\"hi\":\"yay\"}");

var snapshotName = "my_snap";

var sourceClient = new OpenSearchClient(sourceCluster.getUrl(), null, null, true);
var snapshotCreator = new FileSystemSnapshotCreator(snapshotName, sourceClient, ElasticsearchContainer.CLUSTER_SNAPSHOT_DIR);
SnapshotRunner.runAndWaitForCompletion(snapshotCreator);

sourceCluster.copySnapshotData(localDirectory.toString());

var sourceRepo = new FileSystemRepo(localDirectory.toPath());
var targetClient = new OpenSearchClient(targetCluster.getUrl(), null, null, true);

SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_6_8(sourceRepo);
GlobalMetadata.Factory metadataFactory = new GlobalMetadataFactory_ES_6_8(repoDataProvider);
GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(targetClient, null, null, null);
Transformer transformer =
TransformFunctions.getTransformer(ClusterVersion.ES_6_8, ClusterVersion.OS_2_11, 1);
new MetadataRunner(snapshotName, metadataFactory, metadataCreator, transformer).migrateMetadata();

IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_6_8(repoDataProvider);
IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient);
new IndexRunner(snapshotName, indexMetadataFactory, indexCreator, transformer, List.of()).migrateIndices();
}



// PSEUDO: Create a target cluster running OS 2.X (Where x is the latest released version)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class SupportedTargetCluster implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
return Stream.of(
Arguments.of(OpenSearchContainer.Version.V1_3_15),
// Arguments.of(OpenSearchContainer.Version.V1_3_15),
Arguments.of(OpenSearchContainer.Version.V2_14_0)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public class ElasticsearchContainer implements AutoCloseable {
public static final String CLUSTER_SNAPSHOT_DIR = "/usr/share/elasticsearch/snapshots";
public static final Version V7_10_2 =
new Version("docker.elastic.co/elasticsearch/elasticsearch:7.10.2", "7.10.2");
public static final Version V6_8_23 =
new Version("docker.elastic.co/elasticsearch/elasticsearch:6.8.23", "6.8.23");

private final GenericContainer<?> container;
private final Version version;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.opensearch.migrations.transformation.rules;

import java.util.Map.Entry;

import org.opensearch.migrations.transformation.CanApplyResult;
import org.opensearch.migrations.transformation.TransformationRule;
import org.opensearch.migrations.transformation.Version.Product;
import org.opensearch.migrations.transformation.Version;
import org.opensearch.migrations.transformation.VersionRange;
import org.opensearch.migrations.transformation.entity.Index;


public class IndexMappingNotArray implements TransformationRule<Index> {

@Override
public VersionRange supportedSourceVersionRange() {
return new VersionRange(
new Version(Product.ELASTICSEARCH, 0, 0, 0),
new Version(Product.ELASTICSEARCH, 7, 0, 0)
);
}

@Override
public VersionRange supportedTargetVersionRange() {
return new VersionRange(
new Version(Product.OPENSEARCH, 0, 0, 0),
new Version(Product.OPENSEARCH, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE)
);
}

@Override
public CanApplyResult canApply(final Index index) {
final var mappingNode = index.raw().get("mappings");
if (mappingNode.isNull() || mappingNode.size() > 1) {
return CanApplyResult.UNSUPPORTED;
}

if (mappingNode.isObject()) {
return CanApplyResult.NO;
}

return CanApplyResult.YES;
}

@Override
public boolean applyTransformation(final Index index) {
if (CanApplyResult.YES != canApply(index)) {
return false;
}

final var mappingsNode = index.raw().get("mappings");
final var inner = mappingsNode.get(0);

index.raw().set("mappings", inner);

return true;
}
}
Loading

0 comments on commit c09409b

Please sign in to comment.