From 874d92d1c4efc9854629ae785f71fd84c3652abc Mon Sep 17 00:00:00 2001 From: Chris Helma <25470211+chelma@users.noreply.github.com> Date: Fri, 12 Jul 2024 10:21:28 -0500 Subject: [PATCH] Removed unused all-in-one RFS scripts; made tests less verbose (#813) * Removed unused all-in-one RFS scripts; made tests less verbose Signed-off-by: Chris Helma * Updated RFS Test logging per suggestions Signed-off-by: Chris Helma --------- Signed-off-by: Chris Helma --- .../java/com/rfs/RfsMigrateDocuments.java | 2 +- RFS/build.gradle | 24 -- .../java/com/rfs/ReindexFromSnapshot.java | 383 ------------------ RFS/src/main/java/com/rfs/RunRfsWorker.java | 173 -------- RFS/src/test/resources/log4j2.properties | 25 ++ RFS/src/test/resources/log4j2.xml | 23 -- 6 files changed, 26 insertions(+), 604 deletions(-) delete mode 100644 RFS/src/main/java/com/rfs/ReindexFromSnapshot.java delete mode 100644 RFS/src/main/java/com/rfs/RunRfsWorker.java create mode 100644 RFS/src/test/resources/log4j2.properties delete mode 100644 RFS/src/test/resources/log4j2.xml diff --git a/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java b/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java index 1709895f2..bd4a08117 100644 --- a/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java +++ b/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java @@ -145,7 +145,7 @@ public static void main(String[] args) throws Exception { try (var processManager = new LeaseExpireTrigger(workItemId->{ - log.error("Terminating RunRfsWorker because its lease has expired for " + workItemId); + log.error("Terminating RfsMigrateDocuments because its lease has expired for " + workItemId); System.exit(PROCESS_TIMED_OUT); }, Clock.systemUTC())) { var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(arguments.targetHost)), diff --git a/RFS/build.gradle b/RFS/build.gradle index 73b1a1080..2bace1b7d 100644 --- a/RFS/build.gradle +++ b/RFS/build.gradle @@ -69,30 +69,6 @@ dependencies { testFixturesImplementation group: 'org.hamcrest', name: 'hamcrest' } -application { - mainClassName = 'com.rfs.ReindexFromSnapshot' -} - -task runRfsWorker (type: JavaExec) { - classpath = sourceSets.main.runtimeClasspath - mainClass = 'com.rfs.RunRfsWorker' -} - -task createSnapshot (type: JavaExec) { - classpath = sourceSets.main.runtimeClasspath - mainClass = 'com.rfs.RfsCreateSnapshot' -} - -task migrateMetadata (type: JavaExec) { - classpath = sourceSets.main.runtimeClasspath - mainClass = 'com.rfs.RfsMigrateMetadata' -} - -task migrateDocuments (type: JavaExec) { - classpath = sourceSets.main.runtimeClasspath - mainClass = 'com.rfs.RfsMigrateDocuments' -} - test { useJUnitPlatform { excludeTags 'longTest' diff --git a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java deleted file mode 100644 index 4d59461c5..000000000 --- a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java +++ /dev/null @@ -1,383 +0,0 @@ -package com.rfs; - -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import com.beust.jcommander.ParametersDelegate; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.lucene.document.Document; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; -import reactor.core.publisher.Flux; - -import com.rfs.common.*; -import com.rfs.models.GlobalMetadata; -import com.rfs.models.IndexMetadata; -import com.rfs.models.ShardMetadata; -import com.rfs.models.SnapshotMetadata; -import com.rfs.transformers.*; -import com.rfs.version_es_6_8.*; -import com.rfs.version_es_7_10.*; -import com.rfs.version_os_2_11.*; - -public class ReindexFromSnapshot { - private static final Logger logger = LogManager.getLogger(ReindexFromSnapshot.class); - - public static class Args { - @Parameter(names = {"-n", "--snapshot-name"}, description = "The name of the snapshot to migrate", required = true) - public String snapshotName; - - @Parameter(names = {"--snapshot-dir"}, description = "The absolute path to the existing source snapshot directory on local disk", required = false) - public String snapshotDirPath = null; - - @Parameter(names = {"--snapshot-local-repo-dir"}, description = "The absolute path to take and store a new snapshot on source, this location should be accessible by the source and this app", required = false) - public String snapshotLocalRepoDirPath = null; - - @Parameter(names = {"--s3-local-dir"}, description = "The absolute path to the directory on local disk to download S3 files to", required = false) - public String s3LocalDirPath = null; - - @Parameter(names = {"--s3-repo-uri"}, description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2", required = false) - public String s3RepoUri = null; - - @Parameter(names = {"--s3-region"}, description = "The AWS Region the S3 bucket is in, like: us-east-2", required = false) - public String s3Region = null; - - @Parameter(names = {"-l", "--lucene-dir"}, description = "The absolute path to the directory where we'll put the Lucene docs", required = true) - public String luceneDirPath; - - @ParametersDelegate - public ConnectionDetails.SourceArgs sourceArgs; - - @ParametersDelegate - public ConnectionDetails.TargetArgs targetArgs; - - @Parameter(names = {"-s", "--source-version"}, description = "The source cluster's version (e.g. 'es_6_8')", required = true, converter = ClusterVersion.ArgsConverter.class) - public ClusterVersion sourceVersion; - - @Parameter(names = {"-t", "--target-version"}, description = "The target cluster's version (e.g. 'os_2_11')", required = true, converter = ClusterVersion.ArgsConverter.class) - public ClusterVersion targetVersion; - - @Parameter(names = {"--movement-type"}, description = "What you want to move - everything, metadata, or data. Default: 'everything'", required = false, converter = MovementType.ArgsConverter.class) - public MovementType movementType = MovementType.EVERYTHING; - - //https://opensearch.org/docs/2.11/api-reference/cluster-api/cluster-awareness/ - @Parameter(names = {"--min-replicas"}, description = "The minimum number of replicas configured for migrated indices on the target. This can be useful for migrating to targets which use zonal deployments and require additional replicas to meet zone requirements", required = true) - public int minNumberOfReplicas; - - @Parameter(names = {"--template-whitelist"}, description = "List of template names to migrate. Note: For ES 6.8 this refers to legacy templates and for ES 7.10 this is index templates (e.g. 'posts_index_template1, posts_index_template2')", required = false) - public List templateWhitelist; - - @Parameter(names = {"--component-template-whitelist"}, description = "List of component template names to migrate (e.g. 'posts_template1, posts_template2')", required = false) - public List componentTemplateWhitelist; - - @Parameter(names = {"--enable-persistent-run"}, description = "If enabled, the java process will continue in an idle mode after the migration is completed. Default: false", arity=0, required = false) - public boolean enablePersistentRun; - - @Parameter(names = {"--log-level"}, description = "What log level you want. Default: 'info'", required = false, converter = Logging.ArgsConverter.class) - public Level logLevel = Level.INFO; - - @Parameter(names = {"--index_suffix"}, description = "An optional suffix to add to index names as they're transfered. Default: none", required = false) - public String indexSuffix = ""; - } - - public static void main(String[] args) throws InterruptedException { - // Grab out args - Args arguments = new Args(); - JCommander.newBuilder() - .addObject(arguments) - .build() - .parse(args); - - String snapshotName = arguments.snapshotName; - Path snapshotDirPath = (arguments.snapshotDirPath != null) ? Paths.get(arguments.snapshotDirPath) : null; - Path snapshotLocalRepoDirPath = (arguments.snapshotLocalRepoDirPath != null) ? Paths.get(arguments.snapshotLocalRepoDirPath) : null; - Path s3LocalDirPath = (arguments.s3LocalDirPath != null) ? Paths.get(arguments.s3LocalDirPath) : null; - String s3RepoUri = arguments.s3RepoUri; - String s3Region = arguments.s3Region; - Path luceneDirPath = Paths.get(arguments.luceneDirPath); - int awarenessDimensionality = arguments.minNumberOfReplicas + 1; - ClusterVersion sourceVersion = arguments.sourceVersion; - ClusterVersion targetVersion = arguments.targetVersion; - List templateWhitelist = arguments.templateWhitelist; - List componentTemplateWhitelist = arguments.componentTemplateWhitelist; - MovementType movementType = arguments.movementType; - Level logLevel = arguments.logLevel; - String indexSuffix = arguments.indexSuffix; - - Logging.setLevel(logLevel); - - ConnectionDetails sourceConnection = new ConnectionDetails(arguments.sourceArgs); - ConnectionDetails targetConnection = new ConnectionDetails(arguments.targetArgs); - - // Sanity checks - if (!((sourceVersion == ClusterVersion.ES_6_8) || (sourceVersion == ClusterVersion.ES_7_10))) { - throw new IllegalArgumentException("Unsupported source version: " + sourceVersion); - } - - if (targetVersion != ClusterVersion.OS_2_11) { - throw new IllegalArgumentException("Unsupported target version: " + sourceVersion); - } - - /* - * You have three options for providing the snapshot data - * 1. A local snapshot directory - * 2. A source host we'll take the snapshot from - * 3. An S3 URI of an existing snapshot in S3 - * - * If you provide the source host, you still need to provide the S3 details or the snapshotLocalRepoDirPath to write the snapshot to. - */ - if (snapshotDirPath != null && (arguments.sourceArgs.getHost() != null || s3RepoUri != null)) { - throw new IllegalArgumentException("If you specify a local directory to take the snapshot from, you cannot specify a source host or S3 URI"); - } else if (arguments.sourceArgs.getHost() != null) { - if (s3RepoUri == null && s3Region == null && s3LocalDirPath == null && snapshotLocalRepoDirPath == null) { - throw new IllegalArgumentException( - "If you specify a source host, you must also specify the S3 details or the snapshotLocalRepoDirPath to write the snapshot to as well"); - } - if ((s3RepoUri != null || s3Region != null || s3LocalDirPath != null) && - (s3RepoUri == null || s3Region == null || s3LocalDirPath == null)) { - throw new IllegalArgumentException( - "You must specify all S3 details (repo URI, region, local directory path)"); - } - } - - SourceRepo repo; - if (snapshotDirPath != null) { - repo = new FileSystemRepo(snapshotDirPath); - } else if (s3RepoUri != null && s3Region != null && s3LocalDirPath != null) { - repo = S3Repo.create(s3LocalDirPath, new S3Uri(s3RepoUri), s3Region); - } else if (snapshotLocalRepoDirPath != null) { - repo = new FileSystemRepo(snapshotLocalRepoDirPath); - } else { - throw new IllegalArgumentException("Could not construct a source repo from the available, user-supplied arguments"); - } - - // Set the transformer - Transformer transformer = TransformFunctions.getTransformer(sourceVersion, targetVersion, awarenessDimensionality); - - try { - - if (arguments.sourceArgs.getHost() != null) { - // ========================================================================================================== - // Create the snapshot if necessary - // ========================================================================================================== - logger.info("=================================================================="); - logger.info("Attempting to create the snapshot..."); - OpenSearchClient sourceClient = new OpenSearchClient(sourceConnection); - SnapshotCreator snapshotCreator = repo instanceof S3Repo - ? new S3SnapshotCreator(snapshotName, sourceClient, s3RepoUri, s3Region) - : new FileSystemSnapshotCreator(snapshotName, sourceClient, snapshotLocalRepoDirPath.toString()); - snapshotCreator.registerRepo(); - snapshotCreator.createSnapshot(); - while (!snapshotCreator.isSnapshotFinished()) { - logger.info("Snapshot not finished yet; sleeping for 5 seconds..."); - Thread.sleep(5000); - } - logger.info("Snapshot created successfully"); - } - - // ========================================================================================================== - // Read the Repo data file - // ========================================================================================================== - logger.info("=================================================================="); - logger.info("Attempting to read Repo data file..."); - SnapshotRepo.Provider repoDataProvider; - if (sourceVersion == ClusterVersion.ES_6_8) { - repoDataProvider = new SnapshotRepoProvider_ES_6_8(repo); - } else { - repoDataProvider = new SnapshotRepoProvider_ES_7_10(repo); - } - - if (repoDataProvider.getSnapshots().size() > 1){ - // Avoid having to deal with things like incremental snapshots - throw new IllegalArgumentException("Only repos with a single snapshot are supported at this time"); - } - - logger.info("Repo data read successfully"); - - // ========================================================================================================== - // Read the Snapshot details - // ========================================================================================================== - logger.info("=================================================================="); - logger.info("Attempting to read Snapshot details..."); - String snapshotIdString = repoDataProvider.getSnapshotId(snapshotName); - - if (snapshotIdString == null) { - logger.error("Snapshot not found"); - return; - } - SnapshotMetadata snapshotMetadata; - if (sourceVersion == ClusterVersion.ES_6_8) { - snapshotMetadata = new SnapshotMetadataFactory_ES_6_8().fromRepo(repo, repoDataProvider, snapshotName); - } else { - snapshotMetadata = new SnapshotMetadataFactory_ES_7_10().fromRepo(repo, repoDataProvider, snapshotName); - } - logger.info("Snapshot data read successfully"); - - if (!snapshotMetadata.isIncludeGlobalState() && ((movementType == MovementType.EVERYTHING) || (movementType == MovementType.METADATA))){ - throw new IllegalArgumentException("Snapshot does not include global state, so we can't move metadata"); - } - - if (!snapshotMetadata.isSuccessful()){ - throw new IllegalArgumentException("Snapshot must be successful; its actual state is " + snapshotMetadata.getState()); - } - - // We might not actually get this far if the snapshot is the wrong version; we'll probably have failed to - // parse one of the previous metadata files - if (sourceVersion != ClusterVersion.fromInt(snapshotMetadata.getVersionId())){ - throw new IllegalArgumentException("Snapshot version is " + snapshotMetadata.getVersionId() + ", but source version is " + sourceVersion); - } - - if ((movementType == MovementType.EVERYTHING) || (movementType == MovementType.METADATA)){ - // ========================================================================================================== - // Read the Global Metadata - // ========================================================================================================== - logger.info("=================================================================="); - logger.info("Attempting to read Global Metadata details..."); - GlobalMetadata globalMetadata; - if (sourceVersion == ClusterVersion.ES_6_8) { - globalMetadata = new GlobalMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName); - } else { - globalMetadata = new GlobalMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName); - } - logger.info("Global Metadata read successfully"); - - // ========================================================================================================== - // Recreate the Global Metadata - // ========================================================================================================== - logger.info("=================================================================="); - logger.info("Attempting to recreate the Global Metadata..."); - - OpenSearchClient targetClient = new OpenSearchClient(targetConnection); - if (sourceVersion == ClusterVersion.ES_6_8) { - GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(targetClient, templateWhitelist, componentTemplateWhitelist, List.of()); - var transformedRoot = transformer.transformGlobalMetadata(globalMetadata); - metadataCreator.create(transformedRoot); - } else if (sourceVersion == ClusterVersion.ES_7_10) { - GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(targetClient, List.of(), componentTemplateWhitelist, templateWhitelist); - var transformedRoot = transformer.transformGlobalMetadata(globalMetadata); - metadataCreator.create(transformedRoot); - } - } - - // ========================================================================================================== - // Read all the Index Metadata - // ========================================================================================================== - logger.info("=================================================================="); - logger.info("Attempting to read Index Metadata..."); - List indexMetadatas = new ArrayList<>(); - for (SnapshotRepo.Index index : repoDataProvider.getIndicesInSnapshot(snapshotName)) { - logger.info("Reading Index Metadata for index: " + index.getName()); - IndexMetadata indexMetadata; - if (sourceVersion == ClusterVersion.ES_6_8) { - indexMetadata = new IndexMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName, index.getName()); - } else { - indexMetadata = new IndexMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName, index.getName()); - } - indexMetadatas.add(indexMetadata); - } - logger.info("Index Metadata read successfully"); - - OpenSearchClient targetClient = new OpenSearchClient(targetConnection); - if ((movementType == MovementType.EVERYTHING) || (movementType == MovementType.METADATA)){ - // ========================================================================================================== - // Recreate the Indices - // ========================================================================================================== - logger.info("=================================================================="); - logger.info("Attempting to recreate the indices..."); - IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient); - for (IndexMetadata indexMetadata : indexMetadatas) { - String reindexName = indexMetadata.getName() + indexSuffix; - logger.info("Recreating index " + indexMetadata.getName() + " as " + reindexName + " on target..."); - - var transformedRoot = transformer.transformIndexMetadata(indexMetadata); - indexCreator.create(transformedRoot, reindexName, indexMetadata.getId()); - } - } - - if ((movementType == MovementType.EVERYTHING) || (movementType == MovementType.DATA)){ - // ========================================================================================================== - // Unpack the snapshot blobs - // ========================================================================================================== - logger.info("=================================================================="); - logger.info("Unpacking blob files to disk..."); - - int bufferSize; - if (sourceVersion == ClusterVersion.ES_6_8) { - bufferSize = ElasticsearchConstants_ES_6_8.BUFFER_SIZE_IN_BYTES; - } else { - bufferSize = ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES; - } - DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(repo); - SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,luceneDirPath, bufferSize); - - for (IndexMetadata indexMetadata : indexMetadatas) { - logger.info("Processing index: " + indexMetadata.getName()); - for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) { - logger.info("=== Shard ID: " + shardId + " ==="); - - // Get the shard metadata - ShardMetadata shardMetadata; - if (sourceVersion == ClusterVersion.ES_6_8) { - shardMetadata = new ShardMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId); - } else { - shardMetadata = new ShardMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId); - } - - // Unpack the shard - SnapshotShardUnpacker unpacker = unpackerFactory.create(shardMetadata); - unpacker.unpack(); - } - } - - logger.info("Blob files unpacked successfully"); - - // ========================================================================================================== - // Reindex the documents - // ========================================================================================================== - logger.info("=================================================================="); - logger.info("Reindexing the documents..."); - - LuceneDocumentsReader reader = new LuceneDocumentsReader(luceneDirPath); - DocumentReindexer reindexer = new DocumentReindexer(targetClient); - - for (IndexMetadata indexMetadata : indexMetadatas) { - for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) { - logger.info("=== Index Id: " + indexMetadata.getName() + ", Shard ID: " + shardId + " ==="); - - Flux documents = reader.readDocuments(); - String targetIndex = indexMetadata.getName() + indexSuffix; - - final int finalShardId = shardId; // Define in local context for the lambda - reindexer.reindex(targetIndex, documents) - .doOnError(error -> logger.error("Error during reindexing: " + error)) - .doOnSuccess(done -> logger.info("Reindexing completed for index " + targetIndex + ", shard " + finalShardId)) - // Wait for the shard reindexing to complete before proceeding; fine in this demo script, but - // shouldn't be done quite this way in the real RFS Worker. - .block(); - } - } - logger.info("Refreshing target cluster to reflect newly added documents"); - reindexer.refreshAllDocuments(targetConnection); - logger.info("Refresh complete"); - } - - } catch (Exception e) { - e.printStackTrace(); - } - - // Optional temporary persistent runtime flag to continue Java process after steps have completed. This should get - // replaced as this app develops and becomes aware of determining work to be completed - if (arguments.enablePersistentRun) { - while (true) { - logger.info("Process is in idle mode, to retry migration please restart this app."); - Thread.sleep(TimeUnit.MINUTES.toMillis(5)); - } - } - } -} diff --git a/RFS/src/main/java/com/rfs/RunRfsWorker.java b/RFS/src/main/java/com/rfs/RunRfsWorker.java deleted file mode 100644 index 94823d833..000000000 --- a/RFS/src/main/java/com/rfs/RunRfsWorker.java +++ /dev/null @@ -1,173 +0,0 @@ -package com.rfs; - -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import com.beust.jcommander.ParametersDelegate; - -import java.net.URI; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.time.Clock; -import java.util.List; -import java.util.UUID; - -import com.rfs.cms.ApacheHttpClient; -import com.rfs.cms.OpenSearchWorkCoordinator; -import com.rfs.cms.LeaseExpireTrigger; -import com.rfs.cms.ScopedWorkCoordinator; -import com.rfs.common.DefaultSourceRepoAccessor; -import com.rfs.worker.ShardWorkPreparer; -import lombok.extern.slf4j.Slf4j; -import org.apache.logging.log4j.Level; - -import com.rfs.common.ClusterVersion; -import com.rfs.common.ConnectionDetails; -import com.rfs.common.DocumentReindexer; -import com.rfs.common.Logging; -import com.rfs.common.LuceneDocumentsReader; -import com.rfs.common.OpenSearchClient; -import com.rfs.common.S3Uri; -import com.rfs.common.S3Repo; -import com.rfs.common.SnapshotCreator; -import com.rfs.common.SourceRepo; -import com.rfs.common.TryHandlePhaseFailure; -import com.rfs.models.GlobalMetadata; -import com.rfs.models.IndexMetadata; -import com.rfs.models.ShardMetadata; -import com.rfs.common.S3SnapshotCreator; -import com.rfs.common.SnapshotRepo; -import com.rfs.common.SnapshotShardUnpacker; -import com.rfs.transformers.TransformFunctions; -import com.rfs.transformers.Transformer; -import com.rfs.version_es_7_10.ElasticsearchConstants_ES_7_10; -import com.rfs.version_es_7_10.GlobalMetadataFactory_ES_7_10; -import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10; -import com.rfs.version_es_7_10.ShardMetadataFactory_ES_7_10; -import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10; -import com.rfs.version_os_2_11.GlobalMetadataCreator_OS_2_11; -import com.rfs.version_os_2_11.IndexCreator_OS_2_11; -import com.rfs.worker.DocumentsRunner; -import com.rfs.worker.IndexRunner; -import com.rfs.worker.MetadataRunner; -import com.rfs.worker.SnapshotRunner; - -@Slf4j -public class RunRfsWorker { - public static final int PROCESS_TIMED_OUT = 1; - - public static class Args { - @Parameter(names = {"--snapshot-name"}, description = "The name of the snapshot to migrate", required = true) - public String snapshotName; - - @Parameter(names = {"--s3-local-dir"}, description = "The absolute path to the directory on local disk to download S3 files to", required = true) - public String s3LocalDirPath; - - @Parameter(names = {"--s3-repo-uri"}, description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2", required = true) - public String s3RepoUri; - - @Parameter(names = {"--s3-region"}, description = "The AWS Region the S3 bucket is in, like: us-east-2", required = true) - public String s3Region; - - @Parameter(names = {"--lucene-dir"}, description = "The absolute path to the directory where we'll put the Lucene docs", required = true) - public String luceneDirPath; - - @ParametersDelegate - public ConnectionDetails.SourceArgs sourceArgs; - - @ParametersDelegate - public ConnectionDetails.TargetArgs targetArgs; - - @Parameter(names = {"--index-allowlist"}, description = ("Optional. List of index names to migrate" - + " (e.g. 'logs_2024_01, logs_2024_02'). Default: all indices"), required = false) - public List indexAllowlist = List.of(); - - @Parameter(names = {"--index-template-allowlist"}, description = ("Optional. List of index template names to migrate" - + " (e.g. 'posts_index_template1, posts_index_template2'). Default: empty list"), required = false) - public List indexTemplateAllowlist = List.of(); - - @Parameter(names = {"--component-template-allowlist"}, description = ("Optional. List of component template names to migrate" - + " (e.g. 'posts_template1, posts_template2'). Default: empty list"), required = false) - public List componentTemplateAllowlist = List.of(); - - @Parameter(names = {"--max-shard-size-bytes"}, description = ("Optional. The maximum shard size, in bytes, to allow when" - + " performing the document migration. Useful for preventing disk overflow. Default: 50 * 1024 * 1024 * 1024 (50 GB)"), required = false) - public long maxShardSizeBytes = 50 * 1024 * 1024 * 1024L; - - //https://opensearch.org/docs/2.11/api-reference/cluster-api/cluster-awareness/ - @Parameter(names = {"--min-replicas"}, description = ("Optional. The minimum number of replicas configured for migrated indices on the target." - + " This can be useful for migrating to targets which use zonal deployments and require additional replicas to meet zone requirements. Default: 0") - , required = false) - public int minNumberOfReplicas = 0; - - @Parameter(names = {"--log-level"}, description = "What log level you want. Default: 'info'", required = false, converter = Logging.ArgsConverter.class) - public Level logLevel = Level.INFO; - } - - public static void main(String[] args) throws Exception { - // Grab out args - Args arguments = new Args(); - JCommander.newBuilder() - .addObject(arguments) - .build() - .parse(args); - - final String snapshotName = arguments.snapshotName; - final Path s3LocalDirPath = Paths.get(arguments.s3LocalDirPath); - final String s3RepoUri = arguments.s3RepoUri; - final String s3Region = arguments.s3Region; - final Path luceneDirPath = Paths.get(arguments.luceneDirPath); - final List indexAllowlist = arguments.indexAllowlist; - final List indexTemplateAllowlist = arguments.indexTemplateAllowlist; - final List componentTemplateAllowlist = arguments.componentTemplateAllowlist; - final long maxShardSizeBytes = arguments.maxShardSizeBytes; - final int awarenessDimensionality = arguments.minNumberOfReplicas + 1; - final Level logLevel = arguments.logLevel; - - Logging.setLevel(logLevel); - - final ConnectionDetails sourceConnection = new ConnectionDetails(arguments.sourceArgs); - final ConnectionDetails targetConnection = new ConnectionDetails(arguments.targetArgs); - - try (var processManager = new LeaseExpireTrigger(workItemId -> { - log.error("terminating RunRfsWorker because its lease has expired for " + workItemId); - System.exit(PROCESS_TIMED_OUT); - }, Clock.systemUTC())) { - log.info("Running RfsWorker"); - OpenSearchClient sourceClient = new OpenSearchClient(sourceConnection); - OpenSearchClient targetClient = new OpenSearchClient(targetConnection); - - SnapshotCreator snapshotCreator = new S3SnapshotCreator(snapshotName, sourceClient, s3RepoUri, s3Region); - SnapshotRunner.runAndWaitForCompletion(snapshotCreator); - - SourceRepo sourceRepo = S3Repo.create(s3LocalDirPath, new S3Uri(s3RepoUri), s3Region); - SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo); - GlobalMetadata.Factory metadataFactory = new GlobalMetadataFactory_ES_7_10(repoDataProvider); - GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(targetClient, List.of(), componentTemplateAllowlist, indexTemplateAllowlist); - Transformer transformer = TransformFunctions.getTransformer(ClusterVersion.ES_7_10, ClusterVersion.OS_2_11, awarenessDimensionality); - new MetadataRunner(snapshotName, metadataFactory, metadataCreator, transformer).migrateMetadata(); - - IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider); - IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient); - new IndexRunner(snapshotName, indexMetadataFactory, indexCreator, transformer, indexAllowlist).migrateIndices(); - - ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider); - DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo); - var unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor, - luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES); - DocumentReindexer reindexer = new DocumentReindexer(targetClient); - var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(arguments.targetArgs.getHost())), - 5, UUID.randomUUID().toString()); - var scopedWorkCoordinator = new ScopedWorkCoordinator(workCoordinator, processManager); - new ShardWorkPreparer().run(scopedWorkCoordinator, indexMetadataFactory, snapshotName, indexAllowlist); - new DocumentsRunner(scopedWorkCoordinator, - (name,shard) -> shardMetadataFactory.fromRepo(snapshotName,name,shard), - unpackerFactory, - path -> new LuceneDocumentsReader(path), - reindexer) - .migrateNextShard(); - } catch (Exception e) { - log.error("Unexpected error running RfsWorker", e); - throw e; - } - } -} diff --git a/RFS/src/test/resources/log4j2.properties b/RFS/src/test/resources/log4j2.properties new file mode 100644 index 000000000..ec5132a80 --- /dev/null +++ b/RFS/src/test/resources/log4j2.properties @@ -0,0 +1,25 @@ +# Set the status level for the configuration +status = DEBUG + +# Define the root logger +rootLogger.level = info +rootLogger.appenderRef.console.ref = Console + +# Define a console appender +appender.console.type = Console +appender.console.name = Console +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss.SSS} %threadName %-5p %c{1}:%L - %m%n + +# Logger definitions +logger.rfs.name = com.rfs +logger.rfs.level = debug + +logger.wire.name = org.apache.hc.client5.http +logger.wire.level = info + +logger.testcontainers.name = org.testcontainers +logger.testcontainers.level = info + +logger.dockerclientdeps.name = com.github.dockerjava.zerodep +logger.dockerclientdeps.level = info \ No newline at end of file diff --git a/RFS/src/test/resources/log4j2.xml b/RFS/src/test/resources/log4j2.xml deleted file mode 100644 index 99e1e6614..000000000 --- a/RFS/src/test/resources/log4j2.xml +++ /dev/null @@ -1,23 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - -