Skip to content

Commit

Permalink
Fully pull out version specific creators
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Nied <[email protected]>
  • Loading branch information
peternied committed Aug 9, 2024
1 parent cde89c5 commit e1cee13
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class MetadataArgs {
public ConnectionContext.TargetArgs targetArgs = new ConnectionContext.TargetArgs();

@ParametersDelegate
public DataFiltersArgs dataFiltersArgs = new DataFiltersArgs();
public DataFiltersArgs dataFilterArgs = new DataFiltersArgs();

// https://opensearch.org/docs/2.11/api-reference/cluster-api/cluster-awareness/
@Parameter(names = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@
import java.util.List;

import org.opensearch.migrations.DataFiltersArgs;
import org.opensearch.migrations.Flavor;
import org.opensearch.migrations.Version;
import org.opensearch.migrations.cli.Printer;
import org.opensearch.migrations.metadata.GlobalMetadataCreator;
import org.opensearch.migrations.metadata.IndexCreator;
import org.opensearch.migrations.metadata.tracing.IMetadataMigrationContexts.IClusterMetadataContext;

import lombok.AllArgsConstructor;
import lombok.Builder;

import com.rfs.common.http.ConnectionContext;
import com.rfs.models.GlobalMetadata.Factory;
import com.rfs.version_os_2_11.GlobalMetadataCreator_OS_2_11;
import com.rfs.version_os_2_11.IndexCreator_OS_2_11;
import com.rfs.common.OpenSearchClient;

@AllArgsConstructor
public class RemoteCluster implements TargetCluster {
Expand All @@ -36,7 +41,19 @@ public GlobalMetadataCreator getGlobalMetadataCreator(
DataFiltersArgs dataFilters,
IClusterMetadataContext context
) {

if (version.equals(Version.builder().flavor(Flavor.Elasticsearch).major(7).minor(10).build())) {
return new GlobalMetadataCreator_OS_2_11(new OpenSearchClient(connection), dataFilters.indexAllowlist, dataFilters.indexTemplateAllowlist, dataFilters.componentTemplateAllowlist, context);
}

throw new UnsupportedOperationException("Unimplemented method 'getGlobalMetadataCreator'");
}

@Override
public IndexCreator getIndexCreator() {
if (version.equals(Version.builder().flavor(Flavor.Elasticsearch).major(7).minor(10).build())) {
return new IndexCreator_OS_2_11(new OpenSearchClient(connection));
}

throw new UnsupportedOperationException("Unimplemented method 'getIndexCreator'");
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,73 @@
package org.opensearch.migrations.clusters;

public class SnapshotSource {

import java.nio.file.Path;

import org.opensearch.migrations.Flavor;
import org.opensearch.migrations.Version;

import com.rfs.models.GlobalMetadata;
import com.rfs.models.IndexMetadata;
import com.rfs.common.SnapshotRepo;
import com.rfs.common.S3Repo;
import com.rfs.common.SourceRepo;

import com.rfs.common.FileSystemRepo;
import com.rfs.version_es_7_10.GlobalMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10;

import com.rfs.common.S3Uri;


public class SnapshotSource implements SourceCluster {

private final SourceRepo repo;
private final Version version;

public SnapshotSource(Version version, String snapshotRepoPath) {
this.version = version;
this.repo = new FileSystemRepo(Path.of(snapshotRepoPath));
}

public SnapshotSource(Version version, String s3LocalDirPath, String s3RepoUri, String s3Region) {
this.version = version;
this.repo = S3Repo.create(Path.of(s3LocalDirPath), new S3Uri(s3RepoUri), s3Region);
}

@Override
public Version getVersion() {
// Make sure the snapshot exists
// Else throw
// Read the snapshot file, get the version
// Else throw
//
return version;
}

private SnapshotRepo.Provider getProvider() {
if (version.equals(Version.builder().flavor(Flavor.Elasticsearch).major(7).minor(10).build())) {
return new SnapshotRepoProvider_ES_7_10(this.repo);
}

throw new UnsupportedOperationException("Unsupported version " + getVersion());
}

@Override
public GlobalMetadata.Factory getMetadata() {
if (version.equals(Version.builder().flavor(Flavor.Elasticsearch).major(7).minor(10).build())) {
return new GlobalMetadataFactory_ES_7_10(getProvider());
}

throw new UnsupportedOperationException("Unsupported version " + getVersion());
}

@Override
public IndexMetadata.Factory getIndexMetadata() {
if (version.equals(Version.builder().flavor(Flavor.Elasticsearch).major(7).minor(10).build())) {
return new IndexMetadataFactory_ES_7_10(getProvider());
}

throw new UnsupportedOperationException("Unsupported version " + getVersion());
}

}
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package org.opensearch.migrations.clusters;

import java.util.List;
import org.opensearch.migrations.metadata.tracing.IMetadataMigrationContexts;
import org.opensearch.migrations.DataFiltersArgs;
import org.opensearch.migrations.metadata.GlobalMetadataCreator;
import org.opensearch.migrations.metadata.IndexCreator;

public interface TargetCluster {
GlobalMetadataCreator getGlobalMetadataCreator(
public GlobalMetadataCreator getGlobalMetadataCreator(
DataFiltersArgs dataFilters,
IMetadataMigrationContexts.IClusterMetadataContext context);

public IndexCreator getIndexCreator();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.opensearch.migrations.MetadataArgs;
import org.opensearch.migrations.cli.Clusters;
import org.opensearch.migrations.clusters.RemoteCluster;
import org.opensearch.migrations.clusters.SnapshotSource;
import org.opensearch.migrations.clusters.SourceCluster;
import org.opensearch.migrations.metadata.tracing.RootMetadataMigrationContext;

Expand All @@ -30,6 +31,7 @@
import com.rfs.worker.IndexRunner;
import com.rfs.worker.MetadataRunner;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.metadata.GlobalMetadataCreator;

@Slf4j
public class Migrate {
Expand Down Expand Up @@ -92,21 +94,19 @@ public MigrateResult execute(RootMetadataMigrationContext context) {
context.createMetadataMigrationContext()
);
final Transformer transformer = TransformFunctions.getTransformer(
sourceCluster.getVersion(),
targetCluster.getVersion(),
ClusterVersion.fromVersion(sourceCluster.getVersion()),
ClusterVersion.fromVersion(targetCluster.getVersion()),
awarenessDimensionality
);
new MetadataRunner(snapshotName, metadataFactory, metadataCreator, transformer).migrateMetadata();
new MetadataRunner(snapshotName, sourceCluster.getMetadata(), metadataCreator, transformer).migrateMetadata();
log.info("Metadata copy complete.");

final IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
final IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient);
new IndexRunner(
snapshotName,
indexMetadataFactory,
indexCreator,
sourceCluster.getIndexMetadata(),
targetCluster.getIndexCreator(),
transformer,
indexAllowlist,
arguments.dataFilterArgs.indexAllowlist,
context.createIndexContext()
).migrateIndices();
log.info("Index copy complete.");
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,13 @@ private void migrateFrom_ES_v7_X(
arguments.sourceVersion = Version.fromString("ES 7.10");
arguments.fileSystemRepoPath = localDirectory.getAbsolutePath();
arguments.snapshotName = snapshotName;
arguments.indexAllowlist = List.of(indexName);
arguments.componentTemplateAllowlist = List.of(compoTemplateName);
arguments.indexTemplateAllowlist = List.of(indexTemplateName);

var dataFilterArgs = new DataFiltersArgs();
dataFilterArgs.indexAllowlist = List.of(indexName);
dataFilterArgs.componentTemplateAllowlist = List.of(compoTemplateName);
dataFilterArgs.indexTemplateAllowlist = List.of(indexTemplateName);
arguments.dataFilterArgs = dataFilterArgs;

arguments.targetArgs = targetArgs;
arguments.targetVersion = Version.fromString("OS 2.14");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import com.rfs.models.IndexMetadata;
import lombok.extern.slf4j.Slf4j;

import org.opensearch.migrations.metadata.IndexCreator;

@Slf4j
public class IndexCreator_OS_2_11 {
public class IndexCreator_OS_2_11 implements IndexCreator {
private static final ObjectMapper mapper = new ObjectMapper();
protected final OpenSearchClient client;

Expand All @@ -23,11 +25,9 @@ public IndexCreator_OS_2_11(OpenSearchClient client) {

public Optional<ObjectNode> create(
IndexMetadata index,
String indexName,
String indexId,
IMetadataMigrationContexts.ICreateIndexContext context
) {
IndexMetadataData_OS_2_11 indexMetadata = new IndexMetadataData_OS_2_11(index.rawJson(), indexId, indexName);
IndexMetadataData_OS_2_11 indexMetadata = new IndexMetadataData_OS_2_11(index.rawJson(), index.getId(), index.getName());

// Remove some settings which will cause errors if you try to pass them to the API
ObjectNode settings = indexMetadata.getSettings();
Expand All @@ -45,7 +45,7 @@ public Optional<ObjectNode> create(

// Create the index; it's fine if it already exists
try {
return client.createIndex(indexName, body, context);
return client.createIndex(index.getName(), body, context);
} catch (InvalidResponse invalidResponse) {
var illegalArguments = invalidResponse.getIllegalArguments();

Expand All @@ -64,8 +64,8 @@ public Optional<ObjectNode> create(
removeFieldsByPath(settings, shortenedIllegalArgument);
}

log.info("Reattempting creation of index '" + indexName + "' after removing illegal arguments; " + illegalArguments);
return client.createIndex(indexName, body, context);
log.info("Reattempting creation of index '" + index.getName() + "' after removing illegal arguments; " + illegalArguments);
return client.createIndex(index.getName(), body, context);
}
}

Expand Down
6 changes: 3 additions & 3 deletions RFS/src/main/java/com/rfs/worker/IndexRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
import java.util.List;
import java.util.function.BiConsumer;

import org.opensearch.migrations.metadata.IndexCreator;
import org.opensearch.migrations.metadata.tracing.IMetadataMigrationContexts;

import com.rfs.common.FilterScheme;
import com.rfs.common.SnapshotRepo;
import com.rfs.models.IndexMetadata;
import com.rfs.transformers.Transformer;
import com.rfs.version_os_2_11.IndexCreator_OS_2_11;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -19,7 +19,7 @@ public class IndexRunner {

private final String snapshotName;
private final IndexMetadata.Factory metadataFactory;
private final IndexCreator_OS_2_11 indexCreator;
private final IndexCreator indexCreator;
private final Transformer transformer;
private final List<String> indexAllowlist;
private final IMetadataMigrationContexts.ICreateIndexContext context;
Expand All @@ -39,7 +39,7 @@ public void migrateIndices() {
.forEach(index -> {
var indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName());
var transformedRoot = transformer.transformIndexMetadata(indexMetadata);
var resultOp = indexCreator.create(transformedRoot, index.getName(), indexMetadata.getId(), context);
var resultOp = indexCreator.create(transformedRoot, context);
resultOp.ifPresentOrElse(
value -> log.info("Index " + index.getName() + " created successfully"),
() -> log.info("Index " + index.getName() + " already existed; no work required")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.opensearch.migrations.metadata;

import java.util.Optional;

import org.opensearch.migrations.metadata.tracing.IMetadataMigrationContexts;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.rfs.models.IndexMetadata;

public interface IndexCreator {
public Optional<ObjectNode> create(
IndexMetadata index,
IMetadataMigrationContexts.ICreateIndexContext context
);
}

0 comments on commit e1cee13

Please sign in to comment.