From e1cee1374fb0a337aa7d8052dbb7a3c8177ffc42 Mon Sep 17 00:00:00 2001 From: Peter Nied Date: Fri, 9 Aug 2024 22:35:10 +0000 Subject: [PATCH] Fully pull out version specific creators Signed-off-by: Peter Nied --- .../opensearch/migrations/MetadataArgs.java | 2 +- .../migrations/clusters/RemoteCluster.java | 19 ++++- .../migrations/clusters/SnapshotSource.java | 72 +++++++++++++++++- .../migrations/clusters/TargetCluster.java | 7 +- .../migrations/commands/Migrate.java | 16 ++-- .../migrations/commands/SnapshotSource.java | 74 ------------------- .../opensearch/migrations/EndToEndTest.java | 10 ++- .../version_os_2_11/IndexCreator_OS_2_11.java | 14 ++-- .../main/java/com/rfs/worker/IndexRunner.java | 6 +- .../migrations/metadata/IndexCreator.java | 15 ++++ 10 files changed, 134 insertions(+), 101 deletions(-) delete mode 100644 MetadataMigration/src/main/java/org/opensearch/migrations/commands/SnapshotSource.java create mode 100644 RFS/src/main/java/org/opensearch/migrations/metadata/IndexCreator.java diff --git a/MetadataMigration/src/main/java/org/opensearch/migrations/MetadataArgs.java b/MetadataMigration/src/main/java/org/opensearch/migrations/MetadataArgs.java index b46cd8588..ebdf7a9d1 100644 --- a/MetadataMigration/src/main/java/org/opensearch/migrations/MetadataArgs.java +++ b/MetadataMigration/src/main/java/org/opensearch/migrations/MetadataArgs.java @@ -34,7 +34,7 @@ public class MetadataArgs { public ConnectionContext.TargetArgs targetArgs = new ConnectionContext.TargetArgs(); @ParametersDelegate - public DataFiltersArgs dataFiltersArgs = new DataFiltersArgs(); + public DataFiltersArgs dataFilterArgs = new DataFiltersArgs(); // https://opensearch.org/docs/2.11/api-reference/cluster-api/cluster-awareness/ @Parameter(names = { diff --git a/MetadataMigration/src/main/java/org/opensearch/migrations/clusters/RemoteCluster.java b/MetadataMigration/src/main/java/org/opensearch/migrations/clusters/RemoteCluster.java index 127dd455f..bef63d1ff 100644 --- a/MetadataMigration/src/main/java/org/opensearch/migrations/clusters/RemoteCluster.java +++ b/MetadataMigration/src/main/java/org/opensearch/migrations/clusters/RemoteCluster.java @@ -3,9 +3,11 @@ import java.util.List; import org.opensearch.migrations.DataFiltersArgs; +import org.opensearch.migrations.Flavor; import org.opensearch.migrations.Version; import org.opensearch.migrations.cli.Printer; import org.opensearch.migrations.metadata.GlobalMetadataCreator; +import org.opensearch.migrations.metadata.IndexCreator; import org.opensearch.migrations.metadata.tracing.IMetadataMigrationContexts.IClusterMetadataContext; import lombok.AllArgsConstructor; @@ -13,6 +15,9 @@ import com.rfs.common.http.ConnectionContext; import com.rfs.models.GlobalMetadata.Factory; +import com.rfs.version_os_2_11.GlobalMetadataCreator_OS_2_11; +import com.rfs.version_os_2_11.IndexCreator_OS_2_11; +import com.rfs.common.OpenSearchClient; @AllArgsConstructor public class RemoteCluster implements TargetCluster { @@ -36,7 +41,19 @@ public GlobalMetadataCreator getGlobalMetadataCreator( DataFiltersArgs dataFilters, IClusterMetadataContext context ) { - + if (version.equals(Version.builder().flavor(Flavor.Elasticsearch).major(7).minor(10).build())) { + return new GlobalMetadataCreator_OS_2_11(new OpenSearchClient(connection), dataFilters.indexAllowlist, dataFilters.indexTemplateAllowlist, dataFilters.componentTemplateAllowlist, context); + } + throw new UnsupportedOperationException("Unimplemented method 'getGlobalMetadataCreator'"); } + + @Override + public IndexCreator getIndexCreator() { + if (version.equals(Version.builder().flavor(Flavor.Elasticsearch).major(7).minor(10).build())) { + return new IndexCreator_OS_2_11(new OpenSearchClient(connection)); + } + + throw new UnsupportedOperationException("Unimplemented method 'getIndexCreator'"); + } } \ No newline at end of file diff --git a/MetadataMigration/src/main/java/org/opensearch/migrations/clusters/SnapshotSource.java b/MetadataMigration/src/main/java/org/opensearch/migrations/clusters/SnapshotSource.java index 0c0f47758..000ef6426 100644 --- a/MetadataMigration/src/main/java/org/opensearch/migrations/clusters/SnapshotSource.java +++ b/MetadataMigration/src/main/java/org/opensearch/migrations/clusters/SnapshotSource.java @@ -1,5 +1,73 @@ package org.opensearch.migrations.clusters; -public class SnapshotSource { - +import java.nio.file.Path; + +import org.opensearch.migrations.Flavor; +import org.opensearch.migrations.Version; + +import com.rfs.models.GlobalMetadata; +import com.rfs.models.IndexMetadata; +import com.rfs.common.SnapshotRepo; +import com.rfs.common.S3Repo; +import com.rfs.common.SourceRepo; + +import com.rfs.common.FileSystemRepo; +import com.rfs.version_es_7_10.GlobalMetadataFactory_ES_7_10; +import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10; +import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10; + +import com.rfs.common.S3Uri; + + +public class SnapshotSource implements SourceCluster { + + private final SourceRepo repo; + private final Version version; + + public SnapshotSource(Version version, String snapshotRepoPath) { + this.version = version; + this.repo = new FileSystemRepo(Path.of(snapshotRepoPath)); + } + + public SnapshotSource(Version version, String s3LocalDirPath, String s3RepoUri, String s3Region) { + this.version = version; + this.repo = S3Repo.create(Path.of(s3LocalDirPath), new S3Uri(s3RepoUri), s3Region); + } + + @Override + public Version getVersion() { + // Make sure the snapshot exists + // Else throw + // Read the snapshot file, get the version + // Else throw + // + return version; + } + + private SnapshotRepo.Provider getProvider() { + if (version.equals(Version.builder().flavor(Flavor.Elasticsearch).major(7).minor(10).build())) { + return new SnapshotRepoProvider_ES_7_10(this.repo); + } + + throw new UnsupportedOperationException("Unsupported version " + getVersion()); + } + + @Override + public GlobalMetadata.Factory getMetadata() { + if (version.equals(Version.builder().flavor(Flavor.Elasticsearch).major(7).minor(10).build())) { + return new GlobalMetadataFactory_ES_7_10(getProvider()); + } + + throw new UnsupportedOperationException("Unsupported version " + getVersion()); + } + + @Override + public IndexMetadata.Factory getIndexMetadata() { + if (version.equals(Version.builder().flavor(Flavor.Elasticsearch).major(7).minor(10).build())) { + return new IndexMetadataFactory_ES_7_10(getProvider()); + } + + throw new UnsupportedOperationException("Unsupported version " + getVersion()); + } + } diff --git a/MetadataMigration/src/main/java/org/opensearch/migrations/clusters/TargetCluster.java b/MetadataMigration/src/main/java/org/opensearch/migrations/clusters/TargetCluster.java index f18819423..bb107b69d 100644 --- a/MetadataMigration/src/main/java/org/opensearch/migrations/clusters/TargetCluster.java +++ b/MetadataMigration/src/main/java/org/opensearch/migrations/clusters/TargetCluster.java @@ -1,11 +1,14 @@ package org.opensearch.migrations.clusters; -import java.util.List; import org.opensearch.migrations.metadata.tracing.IMetadataMigrationContexts; +import org.opensearch.migrations.DataFiltersArgs; import org.opensearch.migrations.metadata.GlobalMetadataCreator; +import org.opensearch.migrations.metadata.IndexCreator; public interface TargetCluster { - GlobalMetadataCreator getGlobalMetadataCreator( + public GlobalMetadataCreator getGlobalMetadataCreator( DataFiltersArgs dataFilters, IMetadataMigrationContexts.IClusterMetadataContext context); + + public IndexCreator getIndexCreator(); } diff --git a/MetadataMigration/src/main/java/org/opensearch/migrations/commands/Migrate.java b/MetadataMigration/src/main/java/org/opensearch/migrations/commands/Migrate.java index f93210594..b8047f57a 100644 --- a/MetadataMigration/src/main/java/org/opensearch/migrations/commands/Migrate.java +++ b/MetadataMigration/src/main/java/org/opensearch/migrations/commands/Migrate.java @@ -7,6 +7,7 @@ import org.opensearch.migrations.MetadataArgs; import org.opensearch.migrations.cli.Clusters; import org.opensearch.migrations.clusters.RemoteCluster; +import org.opensearch.migrations.clusters.SnapshotSource; import org.opensearch.migrations.clusters.SourceCluster; import org.opensearch.migrations.metadata.tracing.RootMetadataMigrationContext; @@ -30,6 +31,7 @@ import com.rfs.worker.IndexRunner; import com.rfs.worker.MetadataRunner; import lombok.extern.slf4j.Slf4j; +import org.opensearch.migrations.metadata.GlobalMetadataCreator; @Slf4j public class Migrate { @@ -92,21 +94,19 @@ public MigrateResult execute(RootMetadataMigrationContext context) { context.createMetadataMigrationContext() ); final Transformer transformer = TransformFunctions.getTransformer( - sourceCluster.getVersion(), - targetCluster.getVersion(), + ClusterVersion.fromVersion(sourceCluster.getVersion()), + ClusterVersion.fromVersion(targetCluster.getVersion()), awarenessDimensionality ); - new MetadataRunner(snapshotName, metadataFactory, metadataCreator, transformer).migrateMetadata(); + new MetadataRunner(snapshotName, sourceCluster.getMetadata(), metadataCreator, transformer).migrateMetadata(); log.info("Metadata copy complete."); - final IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider); - final IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient); new IndexRunner( snapshotName, - indexMetadataFactory, - indexCreator, + sourceCluster.getIndexMetadata(), + targetCluster.getIndexCreator(), transformer, - indexAllowlist, + arguments.dataFilterArgs.indexAllowlist, context.createIndexContext() ).migrateIndices(); log.info("Index copy complete."); diff --git a/MetadataMigration/src/main/java/org/opensearch/migrations/commands/SnapshotSource.java b/MetadataMigration/src/main/java/org/opensearch/migrations/commands/SnapshotSource.java deleted file mode 100644 index e7dea1edb..000000000 --- a/MetadataMigration/src/main/java/org/opensearch/migrations/commands/SnapshotSource.java +++ /dev/null @@ -1,74 +0,0 @@ -package org.opensearch.migrations.commands; - -import java.nio.file.Path; - -import org.opensearch.migrations.Flavor; -import org.opensearch.migrations.Version; -import org.opensearch.migrations.clusters.SourceCluster; - -import com.rfs.models.GlobalMetadata; -import com.rfs.models.IndexMetadata; -import com.rfs.common.SnapshotRepo; -import com.rfs.common.S3Repo; -import com.rfs.common.SourceRepo; - -import com.rfs.common.FileSystemRepo; -import com.rfs.version_es_7_10.GlobalMetadataFactory_ES_7_10; -import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10; -import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10; - -import com.rfs.common.S3Uri; - - -public class SnapshotSource implements SourceCluster { - - private final SourceRepo repo; - private final Version version; - - public SnapshotSource(Version version, String snapshotRepoPath) { - this.version = version; - this.repo = new FileSystemRepo(Path.of(snapshotRepoPath)); - } - - public SnapshotSource(Version version, String s3LocalDirPath, String s3RepoUri, String s3Region) { - this.version = version; - this.repo = S3Repo.create(Path.of(s3LocalDirPath), new S3Uri(s3RepoUri), s3Region); - } - - @Override - public Version getVersion() { - // Make sure the snapshot exists - // Else throw - // Read the snapshot file, get the version - // Else throw - // - return version; - } - - private SnapshotRepo.Provider getProvider() { - if (version.equals(Version.builder().flavor(Flavor.Elasticsearch).major(7).minor(10).build())) { - return new SnapshotRepoProvider_ES_7_10(this.repo); - } - - throw new UnsupportedOperationException("Unsupported version " + getVersion()); - } - - @Override - public GlobalMetadata.Factory getMetadata() { - if (version.equals(Version.builder().flavor(Flavor.Elasticsearch).major(7).minor(10).build())) { - return new GlobalMetadataFactory_ES_7_10(getProvider()); - } - - throw new UnsupportedOperationException("Unsupported version " + getVersion()); - } - - @Override - public IndexMetadata.Factory getIndexMetadata() { - if (version.equals(Version.builder().flavor(Flavor.Elasticsearch).major(7).minor(10).build())) { - return new IndexMetadataFactory_ES_7_10(getProvider()); - } - - throw new UnsupportedOperationException("Unsupported version " + getVersion()); - } - -} diff --git a/MetadataMigration/src/test/java/org/opensearch/migrations/EndToEndTest.java b/MetadataMigration/src/test/java/org/opensearch/migrations/EndToEndTest.java index 7115fb0dc..1eea5c0f9 100644 --- a/MetadataMigration/src/test/java/org/opensearch/migrations/EndToEndTest.java +++ b/MetadataMigration/src/test/java/org/opensearch/migrations/EndToEndTest.java @@ -89,9 +89,13 @@ private void migrateFrom_ES_v7_X( arguments.sourceVersion = Version.fromString("ES 7.10"); arguments.fileSystemRepoPath = localDirectory.getAbsolutePath(); arguments.snapshotName = snapshotName; - arguments.indexAllowlist = List.of(indexName); - arguments.componentTemplateAllowlist = List.of(compoTemplateName); - arguments.indexTemplateAllowlist = List.of(indexTemplateName); + + var dataFilterArgs = new DataFiltersArgs(); + dataFilterArgs.indexAllowlist = List.of(indexName); + dataFilterArgs.componentTemplateAllowlist = List.of(compoTemplateName); + dataFilterArgs.indexTemplateAllowlist = List.of(indexTemplateName); + arguments.dataFilterArgs = dataFilterArgs; + arguments.targetArgs = targetArgs; arguments.targetVersion = Version.fromString("OS 2.14"); diff --git a/RFS/src/main/java/com/rfs/version_os_2_11/IndexCreator_OS_2_11.java b/RFS/src/main/java/com/rfs/version_os_2_11/IndexCreator_OS_2_11.java index f78c046ae..95bc4f03a 100644 --- a/RFS/src/main/java/com/rfs/version_os_2_11/IndexCreator_OS_2_11.java +++ b/RFS/src/main/java/com/rfs/version_os_2_11/IndexCreator_OS_2_11.java @@ -12,8 +12,10 @@ import com.rfs.models.IndexMetadata; import lombok.extern.slf4j.Slf4j; +import org.opensearch.migrations.metadata.IndexCreator; + @Slf4j -public class IndexCreator_OS_2_11 { +public class IndexCreator_OS_2_11 implements IndexCreator { private static final ObjectMapper mapper = new ObjectMapper(); protected final OpenSearchClient client; @@ -23,11 +25,9 @@ public IndexCreator_OS_2_11(OpenSearchClient client) { public Optional create( IndexMetadata index, - String indexName, - String indexId, IMetadataMigrationContexts.ICreateIndexContext context ) { - IndexMetadataData_OS_2_11 indexMetadata = new IndexMetadataData_OS_2_11(index.rawJson(), indexId, indexName); + IndexMetadataData_OS_2_11 indexMetadata = new IndexMetadataData_OS_2_11(index.rawJson(), index.getId(), index.getName()); // Remove some settings which will cause errors if you try to pass them to the API ObjectNode settings = indexMetadata.getSettings(); @@ -45,7 +45,7 @@ public Optional create( // Create the index; it's fine if it already exists try { - return client.createIndex(indexName, body, context); + return client.createIndex(index.getName(), body, context); } catch (InvalidResponse invalidResponse) { var illegalArguments = invalidResponse.getIllegalArguments(); @@ -64,8 +64,8 @@ public Optional create( removeFieldsByPath(settings, shortenedIllegalArgument); } - log.info("Reattempting creation of index '" + indexName + "' after removing illegal arguments; " + illegalArguments); - return client.createIndex(indexName, body, context); + log.info("Reattempting creation of index '" + index.getName() + "' after removing illegal arguments; " + illegalArguments); + return client.createIndex(index.getName(), body, context); } } diff --git a/RFS/src/main/java/com/rfs/worker/IndexRunner.java b/RFS/src/main/java/com/rfs/worker/IndexRunner.java index a3d43724c..575368391 100644 --- a/RFS/src/main/java/com/rfs/worker/IndexRunner.java +++ b/RFS/src/main/java/com/rfs/worker/IndexRunner.java @@ -3,13 +3,13 @@ import java.util.List; import java.util.function.BiConsumer; +import org.opensearch.migrations.metadata.IndexCreator; import org.opensearch.migrations.metadata.tracing.IMetadataMigrationContexts; import com.rfs.common.FilterScheme; import com.rfs.common.SnapshotRepo; import com.rfs.models.IndexMetadata; import com.rfs.transformers.Transformer; -import com.rfs.version_os_2_11.IndexCreator_OS_2_11; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -19,7 +19,7 @@ public class IndexRunner { private final String snapshotName; private final IndexMetadata.Factory metadataFactory; - private final IndexCreator_OS_2_11 indexCreator; + private final IndexCreator indexCreator; private final Transformer transformer; private final List indexAllowlist; private final IMetadataMigrationContexts.ICreateIndexContext context; @@ -39,7 +39,7 @@ public void migrateIndices() { .forEach(index -> { var indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName()); var transformedRoot = transformer.transformIndexMetadata(indexMetadata); - var resultOp = indexCreator.create(transformedRoot, index.getName(), indexMetadata.getId(), context); + var resultOp = indexCreator.create(transformedRoot, context); resultOp.ifPresentOrElse( value -> log.info("Index " + index.getName() + " created successfully"), () -> log.info("Index " + index.getName() + " already existed; no work required") diff --git a/RFS/src/main/java/org/opensearch/migrations/metadata/IndexCreator.java b/RFS/src/main/java/org/opensearch/migrations/metadata/IndexCreator.java new file mode 100644 index 000000000..66387ffcc --- /dev/null +++ b/RFS/src/main/java/org/opensearch/migrations/metadata/IndexCreator.java @@ -0,0 +1,15 @@ +package org.opensearch.migrations.metadata; + +import java.util.Optional; + +import org.opensearch.migrations.metadata.tracing.IMetadataMigrationContexts; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.rfs.models.IndexMetadata; + +public interface IndexCreator { + public Optional create( + IndexMetadata index, + IMetadataMigrationContexts.ICreateIndexContext context + ); +}