From 49282c111c593bb2b1ee76b50102c06fa1f2c627 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha <81695996+soosinha@users.noreply.github.com> Date: Sat, 8 Jun 2024 11:35:16 +0530 Subject: [PATCH] Implementation of RemoteWritableEntity for objects to uploaded to remote store (#13834) * Implementation of RemoteWritableEntity for objects to uploaded to remote store Signed-off-by: Sooraj Sinha --- .../remote/RemoteWritableEntityStore.java | 2 + .../common/remote/RemoteWriteableEntity.java | 3 + .../remote/RemoteClusterStateUtils.java | 16 ++ .../model/RemoteClusterMetadataManifest.java | 153 +++++++++++ .../model/RemoteCoordinationMetadata.java | 107 ++++++++ .../remote/model/RemoteCustomMetadata.java | 119 ++++++++ .../remote/model/RemoteGlobalMetadata.java | 70 +++++ .../remote/model/RemoteIndexMetadata.java | 106 ++++++++ .../RemotePersistentSettingsMetadata.java | 107 ++++++++ .../remote/model/RemoteTemplatesMetadata.java | 108 ++++++++ .../RemoteClusterMetadataManifestTests.java | 253 ++++++++++++++++++ .../RemoteCoordinationMetadataTests.java | 240 +++++++++++++++++ .../model/RemoteCustomMetadataTests.java | 253 ++++++++++++++++++ .../model/RemoteGlobalMetadataTests.java | 210 +++++++++++++++ .../model/RemoteIndexMetadataTests.java | 189 +++++++++++++ ...RemotePersistentSettingsMetadataTests.java | 230 ++++++++++++++++ .../model/RemoteTemplatesMetadataTests.java | 242 +++++++++++++++++ 17 files changed, 2408 insertions(+) create mode 100644 server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/model/RemoteCoordinationMetadata.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/model/RemoteGlobalMetadata.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/model/RemoteIndexMetadata.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/model/RemotePersistentSettingsMetadata.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/model/RemoteTemplatesMetadata.java create mode 100644 server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifestTests.java create mode 100644 server/src/test/java/org/opensearch/gateway/remote/model/RemoteCoordinationMetadataTests.java create mode 100644 server/src/test/java/org/opensearch/gateway/remote/model/RemoteCustomMetadataTests.java create mode 100644 server/src/test/java/org/opensearch/gateway/remote/model/RemoteGlobalMetadataTests.java create mode 100644 server/src/test/java/org/opensearch/gateway/remote/model/RemoteIndexMetadataTests.java create mode 100644 server/src/test/java/org/opensearch/gateway/remote/model/RemotePersistentSettingsMetadataTests.java create mode 100644 server/src/test/java/org/opensearch/gateway/remote/model/RemoteTemplatesMetadataTests.java diff --git a/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityStore.java b/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityStore.java index ccf7cafff1730..385c6f20ba58d 100644 --- a/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityStore.java +++ b/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityStore.java @@ -8,6 +8,7 @@ package org.opensearch.common.remote; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.core.action.ActionListener; import java.io.IOException; @@ -18,6 +19,7 @@ * @param The object type which can be uploaded to or downloaded from remote storage. * @param The wrapper entity which provides methods for serializing/deserializing entity T. */ +@ExperimentalApi public interface RemoteWritableEntityStore> { public void writeAsync(U entity, ActionListener listener); diff --git a/server/src/main/java/org/opensearch/common/remote/RemoteWriteableEntity.java b/server/src/main/java/org/opensearch/common/remote/RemoteWriteableEntity.java index 778c24dce2e27..773ddce5b9cc8 100644 --- a/server/src/main/java/org/opensearch/common/remote/RemoteWriteableEntity.java +++ b/server/src/main/java/org/opensearch/common/remote/RemoteWriteableEntity.java @@ -8,6 +8,8 @@ package org.opensearch.common.remote; +import org.opensearch.common.annotation.ExperimentalApi; + import java.io.IOException; import java.io.InputStream; @@ -17,6 +19,7 @@ * * @param The object type which can be uploaded to or downloaded from remote storage. */ +@ExperimentalApi public interface RemoteWriteableEntity { /** * @return An InputStream created by serializing the entity T diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java index 500d1af0211e8..f87b0a6e401d3 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java @@ -8,14 +8,30 @@ package org.opensearch.gateway.remote; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.core.xcontent.ToXContent; + import java.nio.charset.StandardCharsets; import java.util.Base64; +import java.util.Map; /** * Utility class for Remote Cluster State */ public class RemoteClusterStateUtils { + + public static final String DELIMITER = "__"; public static final String PATH_DELIMITER = "/"; + public static final String GLOBAL_METADATA_PATH_TOKEN = "global-metadata"; + public static final String METADATA_NAME_FORMAT = "%s.dat"; + public static final String METADATA_NAME_PLAIN_FORMAT = "%s"; + public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 1; + + // ToXContent Params with gateway mode. + // We are using gateway context mode to persist all custom metadata. + public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams( + Map.of(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_GATEWAY) + ); public static String encodeString(String content) { return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8)); diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java new file mode 100644 index 0000000000000..62bc4ce022a37 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java @@ -0,0 +1,153 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.common.io.Streams; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; + +/** + * Wrapper class for uploading/downloading {@link ClusterMetadataManifest} to/from remote blob store + */ +public class RemoteClusterMetadataManifest extends AbstractRemoteWritableBlobEntity { + + public static final String MANIFEST = "manifest"; + public static final int SPLITTED_MANIFEST_FILE_LENGTH = 6; + + public static final String METADATA_MANIFEST_NAME_FORMAT = "%s"; + public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V3; + public static final String COMMITTED = "C"; + public static final String PUBLISHED = "P"; + + /** + * Manifest format compatible with older codec v0, where codec version was missing. + */ + public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT_V0 = + new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV0); + /** + * Manifest format compatible with older codec v1, where global metadata was missing. + */ + public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT_V1 = + new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV1); + + /** + * Manifest format compatible with codec v2, where we introduced codec versions/global metadata. + */ + public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT = new ChecksumBlobStoreFormat<>( + "cluster-metadata-manifest", + METADATA_MANIFEST_NAME_FORMAT, + ClusterMetadataManifest::fromXContent + ); + + private ClusterMetadataManifest clusterMetadataManifest; + + public RemoteClusterMetadataManifest( + final ClusterMetadataManifest clusterMetadataManifest, + final String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry + ) { + super(clusterUUID, compressor, namedXContentRegistry); + this.clusterMetadataManifest = clusterMetadataManifest; + } + + public RemoteClusterMetadataManifest( + final String blobName, + final String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry + ) { + super(clusterUUID, compressor, namedXContentRegistry); + this.blobName = blobName; + } + + @Override + public BlobPathParameters getBlobPathParameters() { + return new BlobPathParameters(List.of(MANIFEST), MANIFEST); + } + + @Override + public String generateBlobFileName() { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest______C/P____ + // + String blobFileName = String.join( + DELIMITER, + MANIFEST, + RemoteStoreUtils.invertLong(clusterMetadataManifest.getClusterTerm()), + RemoteStoreUtils.invertLong(clusterMetadataManifest.getStateVersion()), + (clusterMetadataManifest.isCommitted() ? COMMITTED : PUBLISHED), + RemoteStoreUtils.invertLong(System.currentTimeMillis()), + String.valueOf(clusterMetadataManifest.getCodecVersion()) + // Keep the codec version at last place only, during we read last place to determine codec version. + ); + this.blobFileName = blobFileName; + return blobFileName; + } + + @Override + public UploadedMetadata getUploadedMetadata() { + assert blobName != null; + return new UploadedMetadataAttribute(MANIFEST, blobName); + } + + @Override + public InputStream serialize() throws IOException { + return CLUSTER_METADATA_MANIFEST_FORMAT.serialize( + clusterMetadataManifest, + generateBlobFileName(), + getCompressor(), + RemoteClusterStateUtils.FORMAT_PARAMS + ).streamInput(); + } + + @Override + public ClusterMetadataManifest deserialize(final InputStream inputStream) throws IOException { + ChecksumBlobStoreFormat blobStoreFormat = getClusterMetadataManifestBlobStoreFormat(); + return blobStoreFormat.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); + } + + private int getManifestCodecVersion() { + assert blobName != null; + String[] splitName = blobName.split(DELIMITER); + if (splitName.length == SPLITTED_MANIFEST_FILE_LENGTH) { + return Integer.parseInt(splitName[splitName.length - 1]); // Last value would be codec version. + } else if (splitName.length < SPLITTED_MANIFEST_FILE_LENGTH) { // Where codec is not part of file name, i.e. default codec version 0 + // is used. + return ClusterMetadataManifest.CODEC_V0; + } else { + throw new IllegalArgumentException("Manifest file name is corrupted"); + } + } + + private ChecksumBlobStoreFormat getClusterMetadataManifestBlobStoreFormat() { + long codecVersion = getManifestCodecVersion(); + if (codecVersion == MANIFEST_CURRENT_CODEC_VERSION) { + return CLUSTER_METADATA_MANIFEST_FORMAT; + } else if (codecVersion == ClusterMetadataManifest.CODEC_V1) { + return CLUSTER_METADATA_MANIFEST_FORMAT_V1; + } else if (codecVersion == ClusterMetadataManifest.CODEC_V0) { + return CLUSTER_METADATA_MANIFEST_FORMAT_V0; + } + throw new IllegalArgumentException("Cluster metadata manifest file is corrupted, don't have valid codec version"); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCoordinationMetadata.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCoordinationMetadata.java new file mode 100644 index 0000000000000..95ad7f7724a08 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCoordinationMetadata.java @@ -0,0 +1,107 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.cluster.coordination.CoordinationMetadata; +import org.opensearch.common.io.Streams; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_CURRENT_CODEC_VERSION; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_PLAIN_FORMAT; + +/** + * Wrapper class for uploading/downloading {@link CoordinationMetadata} to/from remote blob store + */ +public class RemoteCoordinationMetadata extends AbstractRemoteWritableBlobEntity { + + public static final String COORDINATION_METADATA = "coordination"; + public static final ChecksumBlobStoreFormat COORDINATION_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( + "coordination", + METADATA_NAME_PLAIN_FORMAT, + CoordinationMetadata::fromXContent + ); + + private CoordinationMetadata coordinationMetadata; + private long metadataVersion; + + public RemoteCoordinationMetadata( + final CoordinationMetadata coordinationMetadata, + final long metadataVersion, + final String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry + ) { + super(clusterUUID, compressor, namedXContentRegistry); + this.coordinationMetadata = coordinationMetadata; + this.metadataVersion = metadataVersion; + } + + public RemoteCoordinationMetadata( + final String blobName, + final String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry + ) { + super(clusterUUID, compressor, namedXContentRegistry); + this.blobName = blobName; + } + + @Override + public BlobPathParameters getBlobPathParameters() { + return new BlobPathParameters(List.of("global-metadata"), COORDINATION_METADATA); + } + + @Override + public String generateBlobFileName() { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/______ + String blobFileName = String.join( + DELIMITER, + getBlobPathParameters().getFilePrefix(), + RemoteStoreUtils.invertLong(metadataVersion), + RemoteStoreUtils.invertLong(System.currentTimeMillis()), + String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION) + ); + this.blobFileName = blobFileName; + return blobFileName; + } + + @Override + public InputStream serialize() throws IOException { + return COORDINATION_METADATA_FORMAT.serialize( + coordinationMetadata, + generateBlobFileName(), + getCompressor(), + RemoteClusterStateUtils.FORMAT_PARAMS + ).streamInput(); + } + + @Override + public CoordinationMetadata deserialize(final InputStream inputStream) throws IOException { + return COORDINATION_METADATA_FORMAT.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); + } + + @Override + public UploadedMetadata getUploadedMetadata() { + assert blobName != null; + return new UploadedMetadataAttribute(COORDINATION_METADATA, blobName); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java new file mode 100644 index 0000000000000..682cac4b39d10 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java @@ -0,0 +1,119 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.Metadata.Custom; +import org.opensearch.common.io.Streams; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_CURRENT_CODEC_VERSION; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_PATH_TOKEN; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_PLAIN_FORMAT; + +/** + * Wrapper class for uploading/downloading {@link Custom} to/from remote blob store + */ +public class RemoteCustomMetadata extends AbstractRemoteWritableBlobEntity { + + public static final String CUSTOM_METADATA = "custom"; + public static final String CUSTOM_DELIMITER = "--"; + public final ChecksumBlobStoreFormat customBlobStoreFormat; + + private Custom custom; + private final String customType; + private long metadataVersion; + + public RemoteCustomMetadata( + final Custom custom, + final String customType, + final long metadataVersion, + final String clusterUUID, + Compressor compressor, + NamedXContentRegistry namedXContentRegistry + ) { + super(clusterUUID, compressor, namedXContentRegistry); + this.custom = custom; + this.customType = customType; + this.metadataVersion = metadataVersion; + this.customBlobStoreFormat = new ChecksumBlobStoreFormat<>( + "custom", + METADATA_NAME_PLAIN_FORMAT, + (parser -> Metadata.Custom.fromXContent(parser, customType)) + ); + } + + public RemoteCustomMetadata( + final String blobName, + final String customType, + final String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry + ) { + super(clusterUUID, compressor, namedXContentRegistry); + this.blobName = blobName; + this.customType = customType; + this.customBlobStoreFormat = new ChecksumBlobStoreFormat<>( + "custom", + METADATA_NAME_PLAIN_FORMAT, + (parser -> Metadata.Custom.fromXContent(parser, customType)) + ); + } + + @Override + public BlobPathParameters getBlobPathParameters() { + String prefix = String.join(CUSTOM_DELIMITER, CUSTOM_METADATA, customType); + return new BlobPathParameters(List.of(GLOBAL_METADATA_PATH_TOKEN), prefix); + } + + @Override + public String generateBlobFileName() { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/______ + // + String blobFileName = String.join( + DELIMITER, + getBlobPathParameters().getFilePrefix(), + RemoteStoreUtils.invertLong(metadataVersion), + RemoteStoreUtils.invertLong(System.currentTimeMillis()), + String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION) + ); + this.blobFileName = blobFileName; + return blobFileName; + } + + @Override + public InputStream serialize() throws IOException { + return customBlobStoreFormat.serialize(custom, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS) + .streamInput(); + } + + @Override + public Custom deserialize(final InputStream inputStream) throws IOException { + return customBlobStoreFormat.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); + } + + @Override + public UploadedMetadata getUploadedMetadata() { + assert blobName != null; + return new UploadedMetadataAttribute(String.join(CUSTOM_DELIMITER, CUSTOM_METADATA, customType), blobName); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteGlobalMetadata.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteGlobalMetadata.java new file mode 100644 index 0000000000000..8e41b155ecb93 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteGlobalMetadata.java @@ -0,0 +1,70 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.common.io.Streams; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; + +import java.io.IOException; +import java.io.InputStream; + +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; + +/** + * Wrapper class for uploading/downloading global metadata ({@link Metadata}) to/from remote blob store + */ +public class RemoteGlobalMetadata extends AbstractRemoteWritableBlobEntity { + + public static final ChecksumBlobStoreFormat GLOBAL_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( + "metadata", + METADATA_NAME_FORMAT, + Metadata::fromXContent + ); + + public RemoteGlobalMetadata( + final String blobName, + final String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry + ) { + super(clusterUUID, compressor, namedXContentRegistry); + this.blobName = blobName; + } + + @Override + public BlobPathParameters getBlobPathParameters() { + throw new UnsupportedOperationException(); + } + + @Override + public String generateBlobFileName() { + throw new UnsupportedOperationException(); + } + + @Override + public UploadedMetadata getUploadedMetadata() { + throw new UnsupportedOperationException(); + } + + @Override + public InputStream serialize() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public Metadata deserialize(final InputStream inputStream) throws IOException { + return GLOBAL_METADATA_FORMAT.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteIndexMetadata.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteIndexMetadata.java new file mode 100644 index 0000000000000..0966a7b09fe17 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteIndexMetadata.java @@ -0,0 +1,106 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.io.Streams; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_PLAIN_FORMAT; + +/** + * Wrapper class for uploading/downloading {@link IndexMetadata} to/from remote blob store + */ +public class RemoteIndexMetadata extends AbstractRemoteWritableBlobEntity { + + public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1; + + public static final ChecksumBlobStoreFormat INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( + "index-metadata", + METADATA_NAME_PLAIN_FORMAT, + IndexMetadata::fromXContent + ); + public static final String INDEX_PATH_TOKEN = "index"; + + private IndexMetadata indexMetadata; + + public RemoteIndexMetadata( + final IndexMetadata indexMetadata, + final String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry + ) { + super(clusterUUID, compressor, namedXContentRegistry); + this.indexMetadata = indexMetadata; + } + + public RemoteIndexMetadata( + final String blobName, + final String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry + ) { + super(clusterUUID, compressor, namedXContentRegistry); + this.blobName = blobName; + } + + @Override + public BlobPathParameters getBlobPathParameters() { + return new BlobPathParameters(List.of(INDEX_PATH_TOKEN, indexMetadata.getIndexUUID()), "metadata"); + } + + @Override + public String generateBlobFileName() { + String blobFileName = String.join( + RemoteClusterStateUtils.DELIMITER, + getBlobPathParameters().getFilePrefix(), + RemoteStoreUtils.invertLong(indexMetadata.getVersion()), + RemoteStoreUtils.invertLong(System.currentTimeMillis()), + String.valueOf(INDEX_METADATA_CURRENT_CODEC_VERSION) // Keep the codec version at last place only, during reads we read last + // place to determine codec version. + ); + this.blobFileName = blobFileName; + return blobFileName; + } + + @Override + public UploadedMetadata getUploadedMetadata() { + assert blobName != null; + return new UploadedIndexMetadata(indexMetadata.getIndex().getName(), indexMetadata.getIndexUUID(), blobName); + } + + @Override + public InputStream serialize() throws IOException { + return INDEX_METADATA_FORMAT.serialize( + indexMetadata, + generateBlobFileName(), + getCompressor(), + RemoteClusterStateUtils.FORMAT_PARAMS + ).streamInput(); + } + + @Override + public IndexMetadata deserialize(final InputStream inputStream) throws IOException { + // Blob name parameter is redundant + return INDEX_METADATA_FORMAT.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); + } + +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemotePersistentSettingsMetadata.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePersistentSettingsMetadata.java new file mode 100644 index 0000000000000..e7604f7b62e72 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePersistentSettingsMetadata.java @@ -0,0 +1,107 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.common.io.Streams; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_CURRENT_CODEC_VERSION; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; + +/** + * Wrapper class for uploading/downloading persistent {@link Settings} to/from remote blob store + */ +public class RemotePersistentSettingsMetadata extends AbstractRemoteWritableBlobEntity { + + public static final String SETTING_METADATA = "settings"; + + public static final ChecksumBlobStoreFormat SETTINGS_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( + "settings", + METADATA_NAME_FORMAT, + Settings::fromXContent + ); + + private Settings persistentSettings; + private long metadataVersion; + + public RemotePersistentSettingsMetadata( + final Settings settings, + final long metadataVersion, + final String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry + ) { + super(clusterUUID, compressor, namedXContentRegistry); + this.persistentSettings = settings; + this.metadataVersion = metadataVersion; + } + + public RemotePersistentSettingsMetadata( + final String blobName, + final String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry + ) { + super(clusterUUID, compressor, namedXContentRegistry); + this.blobName = blobName; + } + + @Override + public BlobPathParameters getBlobPathParameters() { + return new BlobPathParameters(List.of("global-metadata"), SETTING_METADATA); + } + + @Override + public String generateBlobFileName() { + String blobFileName = String.join( + DELIMITER, + getBlobPathParameters().getFilePrefix(), + RemoteStoreUtils.invertLong(metadataVersion), + RemoteStoreUtils.invertLong(System.currentTimeMillis()), + String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION) + ); + this.blobFileName = blobFileName; + return blobFileName; + } + + @Override + public InputStream serialize() throws IOException { + return SETTINGS_METADATA_FORMAT.serialize( + persistentSettings, + generateBlobFileName(), + getCompressor(), + RemoteClusterStateUtils.FORMAT_PARAMS + ).streamInput(); + } + + @Override + public Settings deserialize(final InputStream inputStream) throws IOException { + return SETTINGS_METADATA_FORMAT.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); + } + + @Override + public UploadedMetadata getUploadedMetadata() { + assert blobName != null; + return new UploadedMetadataAttribute(SETTING_METADATA, blobName); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteTemplatesMetadata.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteTemplatesMetadata.java new file mode 100644 index 0000000000000..bbce063a5a0f0 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteTemplatesMetadata.java @@ -0,0 +1,108 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.cluster.metadata.TemplatesMetadata; +import org.opensearch.common.io.Streams; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_CURRENT_CODEC_VERSION; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; + +/** + * Wrapper class for uploading/downloading {@link TemplatesMetadata} to/from remote blob store + */ +public class RemoteTemplatesMetadata extends AbstractRemoteWritableBlobEntity { + + public static final String TEMPLATES_METADATA = "templates"; + + public static final ChecksumBlobStoreFormat TEMPLATES_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( + "templates", + METADATA_NAME_FORMAT, + TemplatesMetadata::fromXContent + ); + private TemplatesMetadata templatesMetadata; + private long metadataVersion; + + public RemoteTemplatesMetadata( + final TemplatesMetadata templatesMetadata, + final long metadataVersion, + final String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry + ) { + super(clusterUUID, compressor, namedXContentRegistry); + this.templatesMetadata = templatesMetadata; + this.metadataVersion = metadataVersion; + } + + public RemoteTemplatesMetadata( + final String blobName, + final String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry + ) { + super(clusterUUID, compressor, namedXContentRegistry); + this.blobName = blobName; + } + + @Override + public BlobPathParameters getBlobPathParameters() { + return new BlobPathParameters(List.of("global-metadata"), TEMPLATES_METADATA); + } + + @Override + public String generateBlobFileName() { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/______ + // + String blobFileName = String.join( + DELIMITER, + getBlobPathParameters().getFilePrefix(), + RemoteStoreUtils.invertLong(metadataVersion), + RemoteStoreUtils.invertLong(System.currentTimeMillis()), + String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION) + ); + this.blobFileName = blobFileName; + return blobFileName; + } + + @Override + public InputStream serialize() throws IOException { + return TEMPLATES_METADATA_FORMAT.serialize( + templatesMetadata, + generateBlobFileName(), + getCompressor(), + RemoteClusterStateUtils.FORMAT_PARAMS + ).streamInput(); + } + + @Override + public TemplatesMetadata deserialize(final InputStream inputStream) throws IOException { + return TEMPLATES_METADATA_FORMAT.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); + } + + @Override + public UploadedMetadata getUploadedMetadata() { + assert blobName != null; + return new UploadedMetadataAttribute(TEMPLATES_METADATA, blobName); + } +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifestTests.java new file mode 100644 index 0000000000000..7cb80a1600c03 --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifestTests.java @@ -0,0 +1,253 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterModule; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.network.NetworkModule; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.indices.IndicesModule; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toList; +import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST; +import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RemoteClusterMetadataManifestTests extends OpenSearchTestCase { + + private static final String TEST_BLOB_NAME = "/test-path/test-blob-name"; + private static final String TEST_BLOB_PATH = "test-path"; + private static final String TEST_BLOB_FILE_NAME = "test-blob-name"; + + private static final long TERM = 2L; + private static final long VERSION = 5L; + + private String clusterUUID; + private BlobStoreTransferService blobStoreTransferService; + private BlobStoreRepository blobStoreRepository; + private String clusterName; + private ClusterSettings clusterSettings; + private Compressor compressor; + private NamedXContentRegistry namedXContentRegistry; + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Before + public void setup() { + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + this.clusterUUID = "test-cluster-uuid"; + this.blobStoreTransferService = mock(BlobStoreTransferService.class); + this.blobStoreRepository = mock(BlobStoreRepository.class); + BlobPath blobPath = new BlobPath().add("/path"); + when(blobStoreRepository.basePath()).thenReturn(blobPath); + when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); + compressor = new NoneCompressor(); + namedXContentRegistry = new NamedXContentRegistry( + Stream.of( + NetworkModule.getNamedXContents().stream(), + IndicesModule.getNamedXContents().stream(), + ClusterModule.getNamedXWriteables().stream() + ).flatMap(Function.identity()).collect(toList()) + ); + this.clusterName = "test-cluster-name"; + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + } + + public void testClusterUUID() { + ClusterMetadataManifest manifest = getClusterMetadataManifest(); + RemoteClusterMetadataManifest remoteObjectForUpload = new RemoteClusterMetadataManifest( + manifest, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForUpload.clusterUUID(), is(clusterUUID)); + + RemoteClusterMetadataManifest remoteObjectForDownload = new RemoteClusterMetadataManifest( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.clusterUUID(), is(clusterUUID)); + } + + public void testFullBlobName() { + ClusterMetadataManifest manifest = getClusterMetadataManifest(); + RemoteClusterMetadataManifest remoteObjectForUpload = new RemoteClusterMetadataManifest( + manifest, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForUpload.getFullBlobName(), nullValue()); + + RemoteClusterMetadataManifest remoteObjectForDownload = new RemoteClusterMetadataManifest( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.getFullBlobName(), is(TEST_BLOB_NAME)); + } + + public void testBlobFileName() { + ClusterMetadataManifest manifest = getClusterMetadataManifest(); + RemoteClusterMetadataManifest remoteObjectForUpload = new RemoteClusterMetadataManifest( + manifest, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForUpload.getBlobFileName(), nullValue()); + + RemoteClusterMetadataManifest remoteObjectForDownload = new RemoteClusterMetadataManifest( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.getBlobFileName(), is(TEST_BLOB_FILE_NAME)); + } + + public void testBlobPathTokens() { + String uploadedFile = "user/local/opensearch/manifest"; + RemoteClusterMetadataManifest remoteObjectForDownload = new RemoteClusterMetadataManifest( + uploadedFile, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.getBlobPathTokens(), is(new String[] { "user", "local", "opensearch", "manifest" })); + } + + public void testBlobPathParameters() { + ClusterMetadataManifest manifest = getClusterMetadataManifest(); + RemoteClusterMetadataManifest remoteObjectForUpload = new RemoteClusterMetadataManifest( + manifest, + clusterUUID, + compressor, + namedXContentRegistry + ); + BlobPathParameters params = remoteObjectForUpload.getBlobPathParameters(); + assertThat(params.getPathTokens(), is(List.of(MANIFEST))); + assertThat(params.getFilePrefix(), is(RemoteClusterMetadataManifest.MANIFEST)); + } + + public void testGenerateBlobFileName() { + ClusterMetadataManifest manifest = getClusterMetadataManifest(); + RemoteClusterMetadataManifest remoteObjectForUpload = new RemoteClusterMetadataManifest( + manifest, + clusterUUID, + compressor, + namedXContentRegistry + ); + String blobFileName = remoteObjectForUpload.generateBlobFileName(); + String[] nameTokens = blobFileName.split(RemoteClusterStateUtils.DELIMITER); + assertThat(nameTokens[0], is(RemoteClusterMetadataManifest.MANIFEST)); + assertThat(RemoteStoreUtils.invertLong(nameTokens[1]), is(TERM)); + assertThat(RemoteStoreUtils.invertLong(nameTokens[2]), is(VERSION)); + assertThat(nameTokens[3], is("C")); + assertThat(RemoteStoreUtils.invertLong(nameTokens[4]), lessThanOrEqualTo(System.currentTimeMillis())); + assertThat(nameTokens[5], is(String.valueOf(MANIFEST_CURRENT_CODEC_VERSION))); + } + + public void testGetUploadedMetadata() throws IOException { + ClusterMetadataManifest manifest = getClusterMetadataManifest(); + RemoteClusterMetadataManifest remoteObjectForUpload = new RemoteClusterMetadataManifest( + manifest, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThrows(AssertionError.class, remoteObjectForUpload::getUploadedMetadata); + + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + remoteObjectForUpload.setFullBlobName(new BlobPath().add(TEST_BLOB_PATH)); + UploadedMetadata uploadedMetadata = remoteObjectForUpload.getUploadedMetadata(); + assertThat(uploadedMetadata.getComponent(), is(MANIFEST)); + assertThat(uploadedMetadata.getUploadedFilename(), is(remoteObjectForUpload.getFullBlobName())); + } + } + + public void testSerDe() throws IOException { + ClusterMetadataManifest manifest = getClusterMetadataManifest(); + RemoteClusterMetadataManifest remoteObjectForUpload = new RemoteClusterMetadataManifest( + manifest, + clusterUUID, + compressor, + namedXContentRegistry + ); + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + remoteObjectForUpload.setFullBlobName(BlobPath.cleanPath()); + assertThat(inputStream.available(), greaterThan(0)); + ClusterMetadataManifest readManifest = remoteObjectForUpload.deserialize(inputStream); + assertThat(readManifest, is(manifest)); + } + + String blobName = "/usr/local/manifest__1__2__3__4__5__6"; + RemoteClusterMetadataManifest invalidRemoteObject = new RemoteClusterMetadataManifest( + blobName, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThrows(IllegalArgumentException.class, () -> invalidRemoteObject.deserialize(new ByteArrayInputStream(new byte[0]))); + } + + private ClusterMetadataManifest getClusterMetadataManifest() { + return ClusterMetadataManifest.builder() + .opensearchVersion(Version.CURRENT) + .codecVersion(MANIFEST_CURRENT_CODEC_VERSION) + .nodeId("test-node") + .clusterUUID("test-uuid") + .previousClusterUUID("_NA_") + .stateUUID("state-uuid") + .clusterTerm(TERM) + .stateVersion(VERSION) + .committed(true) + .coordinationMetadata(new UploadedMetadataAttribute("test-attr", "uploaded-file")) + .build(); + } +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCoordinationMetadataTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCoordinationMetadataTests.java new file mode 100644 index 0000000000000..9484afe6b7d6c --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCoordinationMetadataTests.java @@ -0,0 +1,240 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.cluster.ClusterModule; +import org.opensearch.cluster.coordination.CoordinationMetadata; +import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion; +import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.network.NetworkModule; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.indices.IndicesModule; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toList; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_CURRENT_CODEC_VERSION; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RemoteCoordinationMetadataTests extends OpenSearchTestCase { + private static final String TEST_BLOB_NAME = "/test-path/test-blob-name"; + private static final String TEST_BLOB_PATH = "test-path"; + private static final String TEST_BLOB_FILE_NAME = "test-blob-name"; + private static final long TERM = 3L; + private static final long METADATA_VERSION = 3L; + private String clusterUUID; + private BlobStoreTransferService blobStoreTransferService; + private BlobStoreRepository blobStoreRepository; + private String clusterName; + private ClusterSettings clusterSettings; + private Compressor compressor; + private NamedXContentRegistry namedXContentRegistry; + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Before + public void setup() { + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + this.clusterUUID = "test-cluster-uuid"; + this.blobStoreTransferService = mock(BlobStoreTransferService.class); + this.blobStoreRepository = mock(BlobStoreRepository.class); + BlobPath blobPath = new BlobPath().add("/path"); + when(blobStoreRepository.basePath()).thenReturn(blobPath); + when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); + compressor = new NoneCompressor(); + namedXContentRegistry = new NamedXContentRegistry( + Stream.of( + NetworkModule.getNamedXContents().stream(), + IndicesModule.getNamedXContents().stream(), + ClusterModule.getNamedXWriteables().stream() + ).flatMap(Function.identity()).collect(toList()) + ); + this.clusterName = "test-cluster-name"; + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + } + + public void testClusterUUID() { + CoordinationMetadata coordinationMetadata = getCoordinationMetadata(); + RemoteCoordinationMetadata remoteObjectForUpload = new RemoteCoordinationMetadata( + coordinationMetadata, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForUpload.clusterUUID(), is(clusterUUID)); + + RemoteCoordinationMetadata remoteObjectForDownload = new RemoteCoordinationMetadata( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.clusterUUID(), is(clusterUUID)); + } + + public void testFullBlobName() { + CoordinationMetadata coordinationMetadata = getCoordinationMetadata(); + RemoteCoordinationMetadata remoteObjectForUpload = new RemoteCoordinationMetadata( + coordinationMetadata, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForUpload.getFullBlobName(), nullValue()); + + RemoteCoordinationMetadata remoteObjectForDownload = new RemoteCoordinationMetadata( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.getFullBlobName(), is(TEST_BLOB_NAME)); + } + + public void testBlobFileName() { + CoordinationMetadata coordinationMetadata = getCoordinationMetadata(); + RemoteCoordinationMetadata remoteObjectForUpload = new RemoteCoordinationMetadata( + coordinationMetadata, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForUpload.getBlobFileName(), nullValue()); + + RemoteCoordinationMetadata remoteObjectForDownload = new RemoteCoordinationMetadata( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.getBlobFileName(), is(TEST_BLOB_FILE_NAME)); + } + + public void testBlobPathTokens() { + String uploadedFile = "user/local/opensearch/coordinationMetadata"; + RemoteCoordinationMetadata remoteObjectForDownload = new RemoteCoordinationMetadata( + uploadedFile, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.getBlobPathTokens(), is(new String[] { "user", "local", "opensearch", "coordinationMetadata" })); + } + + public void testBlobPathParameters() { + CoordinationMetadata coordinationMetadata = getCoordinationMetadata(); + RemoteCoordinationMetadata remoteObjectForUpload = new RemoteCoordinationMetadata( + coordinationMetadata, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + BlobPathParameters params = remoteObjectForUpload.getBlobPathParameters(); + assertThat(params.getPathTokens(), is(List.of(RemoteClusterStateUtils.GLOBAL_METADATA_PATH_TOKEN))); + assertThat(params.getFilePrefix(), is(RemoteCoordinationMetadata.COORDINATION_METADATA)); + } + + public void testGenerateBlobFileName() { + CoordinationMetadata coordinationMetadata = getCoordinationMetadata(); + RemoteCoordinationMetadata remoteObjectForUpload = new RemoteCoordinationMetadata( + coordinationMetadata, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + String blobFileName = remoteObjectForUpload.generateBlobFileName(); + String[] nameTokens = blobFileName.split(RemoteClusterStateUtils.DELIMITER); + assertThat(nameTokens[0], is(RemoteCoordinationMetadata.COORDINATION_METADATA)); + assertThat(RemoteStoreUtils.invertLong(nameTokens[1]), is(METADATA_VERSION)); + assertThat(RemoteStoreUtils.invertLong(nameTokens[2]), lessThanOrEqualTo(System.currentTimeMillis())); + assertThat(nameTokens[3], is(String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION))); + + } + + public void testGetUploadedMetadata() throws IOException { + CoordinationMetadata coordinationMetadata = getCoordinationMetadata(); + RemoteCoordinationMetadata remoteObjectForUpload = new RemoteCoordinationMetadata( + coordinationMetadata, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThrows(AssertionError.class, remoteObjectForUpload::getUploadedMetadata); + + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + remoteObjectForUpload.setFullBlobName(new BlobPath().add(TEST_BLOB_PATH)); + UploadedMetadata uploadedMetadata = remoteObjectForUpload.getUploadedMetadata(); + assertThat(uploadedMetadata.getComponent(), is(RemoteCoordinationMetadata.COORDINATION_METADATA)); + assertThat(uploadedMetadata.getUploadedFilename(), is(remoteObjectForUpload.getFullBlobName())); + } + } + + public void testSerDe() throws IOException { + CoordinationMetadata coordinationMetadata = getCoordinationMetadata(); + RemoteCoordinationMetadata remoteObjectForUpload = new RemoteCoordinationMetadata( + coordinationMetadata, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + remoteObjectForUpload.setFullBlobName(BlobPath.cleanPath()); + assertThat(inputStream.available(), greaterThan(0)); + CoordinationMetadata readcoordinationMetadata = remoteObjectForUpload.deserialize(inputStream); + assertThat(readcoordinationMetadata, is(coordinationMetadata)); + } + } + + private CoordinationMetadata getCoordinationMetadata() { + return CoordinationMetadata.builder() + .term(TERM) + .lastAcceptedConfiguration(new VotingConfiguration(Set.of("node1"))) + .lastCommittedConfiguration(new VotingConfiguration(Set.of("node1"))) + .addVotingConfigExclusion(new VotingConfigExclusion("node2", " node-2")) + .build(); + } +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCustomMetadataTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCustomMetadataTests.java new file mode 100644 index 0000000000000..a0c60ee2088b8 --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCustomMetadataTests.java @@ -0,0 +1,253 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.cluster.ClusterModule; +import org.opensearch.cluster.metadata.IndexGraveyard; +import org.opensearch.cluster.metadata.Metadata.Custom; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.network.NetworkModule; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.core.index.Index; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.indices.IndicesModule; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toList; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_CURRENT_CODEC_VERSION; +import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_DELIMITER; +import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_METADATA; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RemoteCustomMetadataTests extends OpenSearchTestCase { + private static final String TEST_BLOB_NAME = "/test-path/test-blob-name"; + private static final String TEST_BLOB_PATH = "test-path"; + private static final String TEST_BLOB_FILE_NAME = "test-blob-name"; + private static final String CUSTOM_TYPE = "test-custom"; + private static final long METADATA_VERSION = 3L; + private String clusterUUID; + private BlobStoreTransferService blobStoreTransferService; + private BlobStoreRepository blobStoreRepository; + private String clusterName; + private ClusterSettings clusterSettings; + private Compressor compressor; + private NamedXContentRegistry namedXContentRegistry; + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Before + public void setup() { + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + this.clusterUUID = "test-cluster-uuid"; + this.blobStoreTransferService = mock(BlobStoreTransferService.class); + this.blobStoreRepository = mock(BlobStoreRepository.class); + BlobPath blobPath = new BlobPath().add("/path"); + when(blobStoreRepository.basePath()).thenReturn(blobPath); + when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); + compressor = new NoneCompressor(); + namedXContentRegistry = new NamedXContentRegistry( + Stream.of( + NetworkModule.getNamedXContents().stream(), + IndicesModule.getNamedXContents().stream(), + ClusterModule.getNamedXWriteables().stream() + ).flatMap(Function.identity()).collect(toList()) + ); + // namedXContentRegistry = new NamedXContentRegistry(List.of(new Entry(Metadata.Custom.class, new ParseField(CUSTOM_TYPE), + // p->TestCustomMetadata.fromXContent(CustomMetadata1::new, p)))); + this.clusterName = "test-cluster-name"; + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + } + + public void testClusterUUID() { + Custom customMetadata = getCustomMetadata(); + RemoteCustomMetadata remoteObjectForUpload = new RemoteCustomMetadata( + customMetadata, + "test-custom", + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForUpload.clusterUUID(), is(clusterUUID)); + + RemoteCustomMetadata remoteObjectForDownload = new RemoteCustomMetadata( + TEST_BLOB_NAME, + "test-custom", + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.clusterUUID(), is(clusterUUID)); + } + + public void testFullBlobName() { + Custom customMetadata = getCustomMetadata(); + RemoteCustomMetadata remoteObjectForUpload = new RemoteCustomMetadata( + customMetadata, + "test-custom", + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForUpload.getFullBlobName(), nullValue()); + + RemoteCustomMetadata remoteObjectForDownload = new RemoteCustomMetadata( + TEST_BLOB_NAME, + "test-custom", + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.getFullBlobName(), is(TEST_BLOB_NAME)); + } + + public void testBlobFileName() { + Custom customMetadata = getCustomMetadata(); + RemoteCustomMetadata remoteObjectForUpload = new RemoteCustomMetadata( + customMetadata, + "test-custom", + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForUpload.getBlobFileName(), nullValue()); + + RemoteCustomMetadata remoteObjectForDownload = new RemoteCustomMetadata( + TEST_BLOB_NAME, + "test-custom", + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.getBlobFileName(), is(TEST_BLOB_FILE_NAME)); + } + + public void testBlobPathTokens() { + String uploadedFile = "user/local/opensearch/customMetadata"; + RemoteCustomMetadata remoteObjectForDownload = new RemoteCustomMetadata( + uploadedFile, + "test-custom", + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.getBlobPathTokens(), is(new String[] { "user", "local", "opensearch", "customMetadata" })); + } + + public void testBlobPathParameters() { + Custom customMetadata = getCustomMetadata(); + RemoteCustomMetadata remoteObjectForUpload = new RemoteCustomMetadata( + customMetadata, + "test-custom", + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + BlobPathParameters params = remoteObjectForUpload.getBlobPathParameters(); + assertThat(params.getPathTokens(), is(List.of(RemoteClusterStateUtils.GLOBAL_METADATA_PATH_TOKEN))); + String expectedPrefix = CUSTOM_METADATA + CUSTOM_DELIMITER + CUSTOM_TYPE; + assertThat(params.getFilePrefix(), is(expectedPrefix)); + } + + public void testGenerateBlobFileName() { + Custom customMetadata = getCustomMetadata(); + RemoteCustomMetadata remoteObjectForUpload = new RemoteCustomMetadata( + customMetadata, + "test-custom", + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + String blobFileName = remoteObjectForUpload.generateBlobFileName(); + String[] nameTokens = blobFileName.split(RemoteClusterStateUtils.DELIMITER); + String expectedPrefix = CUSTOM_METADATA + CUSTOM_DELIMITER + CUSTOM_TYPE; + assertThat(nameTokens[0], is(expectedPrefix)); + assertThat(RemoteStoreUtils.invertLong(nameTokens[1]), is(METADATA_VERSION)); + assertThat(RemoteStoreUtils.invertLong(nameTokens[2]), lessThanOrEqualTo(System.currentTimeMillis())); + assertThat(nameTokens[3], is(String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION))); + + } + + public void testGetUploadedMetadata() throws IOException { + Custom customMetadata = getCustomMetadata(); + RemoteCustomMetadata remoteObjectForUpload = new RemoteCustomMetadata( + customMetadata, + "test-custom", + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThrows(AssertionError.class, remoteObjectForUpload::getUploadedMetadata); + + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + remoteObjectForUpload.setFullBlobName(new BlobPath().add(TEST_BLOB_PATH)); + UploadedMetadata uploadedMetadata = remoteObjectForUpload.getUploadedMetadata(); + String expectedPrefix = CUSTOM_METADATA + CUSTOM_DELIMITER + CUSTOM_TYPE; + assertThat(uploadedMetadata.getComponent(), is(expectedPrefix)); + assertThat(uploadedMetadata.getUploadedFilename(), is(remoteObjectForUpload.getFullBlobName())); + } + } + + public void testSerDe() throws IOException { + Custom customMetadata = getCustomMetadata(); + RemoteCustomMetadata remoteObjectForUpload = new RemoteCustomMetadata( + customMetadata, + IndexGraveyard.TYPE, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + remoteObjectForUpload.setFullBlobName(BlobPath.cleanPath()); + assertThat(inputStream.available(), greaterThan(0)); + Custom readCustomMetadata = remoteObjectForUpload.deserialize(inputStream); + assertThat(readCustomMetadata, is(customMetadata)); + } + } + + private Custom getCustomMetadata() { + return IndexGraveyard.builder().addTombstone(new Index("test-index", "3q2423")).build(); + } + +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteGlobalMetadataTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteGlobalMetadataTests.java new file mode 100644 index 0000000000000..24752302dc3df --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteGlobalMetadataTests.java @@ -0,0 +1,210 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.cluster.ClusterModule; +import org.opensearch.cluster.coordination.CoordinationMetadata; +import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion; +import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration; +import org.opensearch.cluster.metadata.IndexTemplateMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.TemplatesMetadata; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.network.NetworkModule; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.indices.IndicesModule; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toList; +import static org.opensearch.gateway.remote.model.RemoteGlobalMetadata.GLOBAL_METADATA_FORMAT; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RemoteGlobalMetadataTests extends OpenSearchTestCase { + + private static final String TEST_BLOB_NAME = "/test-path/test-blob-name"; + private static final String TEST_BLOB_FILE_NAME = "test-blob-name"; + private static final long TERM = 3L; + private String clusterUUID; + private BlobStoreTransferService blobStoreTransferService; + private BlobStoreRepository blobStoreRepository; + private String clusterName; + private ClusterSettings clusterSettings; + private Compressor compressor; + private NamedXContentRegistry namedXContentRegistry; + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Before + public void setup() { + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + this.clusterUUID = "test-cluster-uuid"; + this.blobStoreTransferService = mock(BlobStoreTransferService.class); + this.blobStoreRepository = mock(BlobStoreRepository.class); + BlobPath blobPath = new BlobPath().add("/path"); + when(blobStoreRepository.basePath()).thenReturn(blobPath); + when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); + compressor = new NoneCompressor(); + namedXContentRegistry = new NamedXContentRegistry( + Stream.of( + NetworkModule.getNamedXContents().stream(), + IndicesModule.getNamedXContents().stream(), + ClusterModule.getNamedXWriteables().stream() + ).flatMap(Function.identity()).collect(toList()) + ); + this.clusterName = "test-cluster-name"; + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + } + + public void testClusterUUID() { + RemoteGlobalMetadata remoteObjectForDownload = new RemoteGlobalMetadata( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.clusterUUID(), is(clusterUUID)); + } + + public void testFullBlobName() { + RemoteGlobalMetadata remoteObjectForDownload = new RemoteGlobalMetadata( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.getFullBlobName(), is(TEST_BLOB_NAME)); + } + + public void testBlobFileName() { + RemoteGlobalMetadata remoteObjectForDownload = new RemoteGlobalMetadata( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.getBlobFileName(), is(TEST_BLOB_FILE_NAME)); + } + + public void testBlobPathTokens() { + String uploadedFile = "user/local/opensearch/globalMetadata"; + RemoteGlobalMetadata remoteObjectForDownload = new RemoteGlobalMetadata( + uploadedFile, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.getBlobPathTokens(), is(new String[] { "user", "local", "opensearch", "globalMetadata" })); + } + + public void testBlobPathParameters() { + RemoteGlobalMetadata remoteObjectForDownload = new RemoteGlobalMetadata( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThrows(UnsupportedOperationException.class, remoteObjectForDownload::getBlobPathParameters); + } + + public void testGenerateBlobFileName() { + RemoteGlobalMetadata remoteObjectForDownload = new RemoteGlobalMetadata( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThrows(UnsupportedOperationException.class, remoteObjectForDownload::generateBlobFileName); + } + + public void testGetUploadedMetadata() { + RemoteGlobalMetadata remoteObjectForDownload = new RemoteGlobalMetadata( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThrows(UnsupportedOperationException.class, remoteObjectForDownload::getUploadedMetadata); + } + + public void testSerDe() throws IOException { + Metadata globalMetadata = getGlobalMetadata(); + try ( + InputStream inputStream = GLOBAL_METADATA_FORMAT.serialize( + globalMetadata, + TEST_BLOB_FILE_NAME, + compressor, + RemoteClusterStateUtils.FORMAT_PARAMS + ).streamInput() + ) { + RemoteGlobalMetadata remoteObjectForDownload = new RemoteGlobalMetadata( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(inputStream.available(), greaterThan(0)); + Metadata readglobalMetadata = remoteObjectForDownload.deserialize(inputStream); + assertTrue(Metadata.isGlobalStateEquals(readglobalMetadata, globalMetadata)); + } + } + + private Metadata getGlobalMetadata() { + return Metadata.builder() + .templates( + TemplatesMetadata.builder() + .put( + IndexTemplateMetadata.builder("template" + randomAlphaOfLength(3)) + .patterns(Arrays.asList("bar-*", "foo-*")) + .settings( + Settings.builder() + .put("index.random_index_setting_" + randomAlphaOfLength(3), randomAlphaOfLength(5)) + .build() + ) + .build() + ) + .build() + ) + .coordinationMetadata( + CoordinationMetadata.builder() + .term(TERM) + .lastAcceptedConfiguration(new VotingConfiguration(Set.of("node1"))) + .lastCommittedConfiguration(new VotingConfiguration(Set.of("node1"))) + .addVotingConfigExclusion(new VotingConfigExclusion("node2", " node-2")) + .build() + ) + .build(); + + } +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteIndexMetadataTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteIndexMetadataTests.java new file mode 100644 index 0000000000000..8bf053e45e0b3 --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteIndexMetadataTests.java @@ -0,0 +1,189 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterModule; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.network.NetworkModule; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.core.index.Index; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.indices.IndicesModule; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toList; +import static org.opensearch.gateway.remote.model.RemoteIndexMetadata.INDEX_METADATA_CURRENT_CODEC_VERSION; +import static org.opensearch.gateway.remote.model.RemoteIndexMetadata.INDEX_PATH_TOKEN; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RemoteIndexMetadataTests extends OpenSearchTestCase { + + private static final String TEST_BLOB_NAME = "/test-path/test-blob-name"; + private static final String TEST_BLOB_PATH = "test-path"; + private static final String TEST_BLOB_FILE_NAME = "test-blob-name"; + private static final long VERSION = 5L; + + private String clusterUUID; + private BlobStoreTransferService blobStoreTransferService; + private BlobStoreRepository blobStoreRepository; + private String clusterName; + private ClusterSettings clusterSettings; + private Compressor compressor; + private NamedXContentRegistry namedXContentRegistry; + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Before + public void setup() { + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + this.clusterUUID = "test-cluster-uuid"; + this.blobStoreTransferService = mock(BlobStoreTransferService.class); + this.blobStoreRepository = mock(BlobStoreRepository.class); + BlobPath blobPath = new BlobPath().add("/path"); + when(blobStoreRepository.basePath()).thenReturn(blobPath); + when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); + this.clusterName = "test-cluster-name"; + compressor = new NoneCompressor(); + namedXContentRegistry = new NamedXContentRegistry( + Stream.of( + NetworkModule.getNamedXContents().stream(), + IndicesModule.getNamedXContents().stream(), + ClusterModule.getNamedXWriteables().stream() + ).flatMap(Function.identity()).collect(toList()) + ); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + } + + public void testClusterUUID() { + IndexMetadata indexMetadata = getIndexMetadata(); + RemoteIndexMetadata remoteObjectForUpload = new RemoteIndexMetadata(indexMetadata, clusterUUID, compressor, namedXContentRegistry); + assertThat(remoteObjectForUpload.clusterUUID(), is(clusterUUID)); + + RemoteIndexMetadata remoteObjectForDownload = new RemoteIndexMetadata( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.clusterUUID(), is(clusterUUID)); + } + + public void testFullBlobName() { + IndexMetadata indexMetadata = getIndexMetadata(); + RemoteIndexMetadata remoteObjectForUpload = new RemoteIndexMetadata(indexMetadata, clusterUUID, compressor, namedXContentRegistry); + assertThat(remoteObjectForUpload.getFullBlobName(), nullValue()); + + RemoteIndexMetadata remoteObjectForDownload = new RemoteIndexMetadata( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.getFullBlobName(), is(TEST_BLOB_NAME)); + } + + public void testBlobFileName() { + IndexMetadata indexMetadata = getIndexMetadata(); + RemoteIndexMetadata remoteObjectForUpload = new RemoteIndexMetadata(indexMetadata, clusterUUID, compressor, namedXContentRegistry); + assertThat(remoteObjectForUpload.getBlobFileName(), nullValue()); + + RemoteIndexMetadata remoteObjectForDownload = new RemoteIndexMetadata( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.getBlobFileName(), is(TEST_BLOB_FILE_NAME)); + } + + public void testBlobPathParameters() { + IndexMetadata indexMetadata = getIndexMetadata(); + RemoteIndexMetadata remoteObjectForUpload = new RemoteIndexMetadata(indexMetadata, clusterUUID, compressor, namedXContentRegistry); + BlobPathParameters params = remoteObjectForUpload.getBlobPathParameters(); + assertThat(params.getPathTokens(), is(List.of(INDEX_PATH_TOKEN, indexMetadata.getIndexUUID()))); + assertThat(params.getFilePrefix(), is("metadata")); + } + + public void testGenerateBlobFileName() { + IndexMetadata indexMetadata = getIndexMetadata(); + RemoteIndexMetadata remoteObjectForUpload = new RemoteIndexMetadata(indexMetadata, clusterUUID, compressor, namedXContentRegistry); + String blobFileName = remoteObjectForUpload.generateBlobFileName(); + String[] nameTokens = blobFileName.split(RemoteClusterStateUtils.DELIMITER); + assertThat(nameTokens[0], is("metadata")); + assertThat(RemoteStoreUtils.invertLong(nameTokens[1]), is(VERSION)); + assertThat(RemoteStoreUtils.invertLong(nameTokens[2]), lessThanOrEqualTo(System.currentTimeMillis())); + assertThat(nameTokens[3], is(String.valueOf(INDEX_METADATA_CURRENT_CODEC_VERSION))); + } + + public void testGetUploadedMetadata() throws IOException { + IndexMetadata indexMetadata = getIndexMetadata(); + RemoteIndexMetadata remoteObjectForUpload = new RemoteIndexMetadata(indexMetadata, clusterUUID, compressor, namedXContentRegistry); + assertThrows(AssertionError.class, remoteObjectForUpload::getUploadedMetadata); + + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + remoteObjectForUpload.setFullBlobName(new BlobPath().add(TEST_BLOB_PATH)); + UploadedMetadata uploadedMetadata = remoteObjectForUpload.getUploadedMetadata(); + assertThat(uploadedMetadata.getUploadedFilename(), is(remoteObjectForUpload.getBlobFileName())); + } + } + + public void testSerDe() throws IOException { + IndexMetadata indexMetadata = getIndexMetadata(); + RemoteIndexMetadata remoteObjectForUpload = new RemoteIndexMetadata(indexMetadata, clusterUUID, compressor, namedXContentRegistry); + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + assertThat(inputStream.available(), greaterThan(0)); + IndexMetadata readIndexMetadata = remoteObjectForUpload.deserialize(inputStream); + assertThat(readIndexMetadata, is(indexMetadata)); + } + } + + private IndexMetadata getIndexMetadata() { + final Index index = new Index("test-index", "index-uuid"); + final Settings idxSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) + .build(); + return new IndexMetadata.Builder(index.getName()).settings(idxSettings) + .version(VERSION) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + } +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemotePersistentSettingsMetadataTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemotePersistentSettingsMetadataTests.java new file mode 100644 index 0000000000000..850c18f03fa49 --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemotePersistentSettingsMetadataTests.java @@ -0,0 +1,230 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.cluster.ClusterModule; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.network.NetworkModule; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.indices.IndicesModule; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toList; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_CURRENT_CODEC_VERSION; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RemotePersistentSettingsMetadataTests extends OpenSearchTestCase { + private static final String TEST_BLOB_NAME = "/test-path/test-blob-name"; + private static final String TEST_BLOB_PATH = "test-path"; + private static final String TEST_BLOB_FILE_NAME = "test-blob-name"; + private static final long METADATA_VERSION = 3L; + private String clusterUUID; + private BlobStoreTransferService blobStoreTransferService; + private BlobStoreRepository blobStoreRepository; + private String clusterName; + private ClusterSettings clusterSettings; + private Compressor compressor; + private NamedXContentRegistry namedXContentRegistry; + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Before + public void setup() { + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + this.clusterUUID = "test-cluster-uuid"; + this.blobStoreTransferService = mock(BlobStoreTransferService.class); + this.blobStoreRepository = mock(BlobStoreRepository.class); + BlobPath blobPath = new BlobPath().add("/path"); + when(blobStoreRepository.basePath()).thenReturn(blobPath); + when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); + compressor = new NoneCompressor(); + namedXContentRegistry = new NamedXContentRegistry( + Stream.of( + NetworkModule.getNamedXContents().stream(), + IndicesModule.getNamedXContents().stream(), + ClusterModule.getNamedXWriteables().stream() + ).flatMap(Function.identity()).collect(toList()) + ); + this.clusterName = "test-cluster-name"; + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + } + + public void testClusterUUID() { + Settings settings = getSettings(); + RemotePersistentSettingsMetadata remoteObjectForUpload = new RemotePersistentSettingsMetadata( + settings, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForUpload.clusterUUID(), is(clusterUUID)); + + RemotePersistentSettingsMetadata remoteObjectForDownload = new RemotePersistentSettingsMetadata( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.clusterUUID(), is(clusterUUID)); + } + + public void testFullBlobName() { + Settings settings = getSettings(); + RemotePersistentSettingsMetadata remoteObjectForUpload = new RemotePersistentSettingsMetadata( + settings, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForUpload.getFullBlobName(), nullValue()); + + RemotePersistentSettingsMetadata remoteObjectForDownload = new RemotePersistentSettingsMetadata( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.getFullBlobName(), is(TEST_BLOB_NAME)); + } + + public void testBlobFileName() { + Settings settings = getSettings(); + RemotePersistentSettingsMetadata remoteObjectForUpload = new RemotePersistentSettingsMetadata( + settings, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForUpload.getBlobFileName(), nullValue()); + + RemotePersistentSettingsMetadata remoteObjectForDownload = new RemotePersistentSettingsMetadata( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.getBlobFileName(), is(TEST_BLOB_FILE_NAME)); + } + + public void testBlobPathTokens() { + String uploadedFile = "user/local/opensearch/settings"; + RemotePersistentSettingsMetadata remoteObjectForDownload = new RemotePersistentSettingsMetadata( + uploadedFile, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.getBlobPathTokens(), is(new String[] { "user", "local", "opensearch", "settings" })); + } + + public void testBlobPathParameters() { + Settings settings = getSettings(); + RemotePersistentSettingsMetadata remoteObjectForUpload = new RemotePersistentSettingsMetadata( + settings, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + BlobPathParameters params = remoteObjectForUpload.getBlobPathParameters(); + assertThat(params.getPathTokens(), is(List.of(RemoteClusterStateUtils.GLOBAL_METADATA_PATH_TOKEN))); + assertThat(params.getFilePrefix(), is(RemotePersistentSettingsMetadata.SETTING_METADATA)); + } + + public void testGenerateBlobFileName() { + Settings settings = getSettings(); + RemotePersistentSettingsMetadata remoteObjectForUpload = new RemotePersistentSettingsMetadata( + settings, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + String blobFileName = remoteObjectForUpload.generateBlobFileName(); + String[] nameTokens = blobFileName.split(RemoteClusterStateUtils.DELIMITER); + assertThat(nameTokens[0], is(RemotePersistentSettingsMetadata.SETTING_METADATA)); + assertThat(RemoteStoreUtils.invertLong(nameTokens[1]), is(METADATA_VERSION)); + assertThat(RemoteStoreUtils.invertLong(nameTokens[2]), lessThanOrEqualTo(System.currentTimeMillis())); + assertThat(nameTokens[3], is(String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION))); + + } + + public void testGetUploadedMetadata() throws IOException { + Settings settings = getSettings(); + RemotePersistentSettingsMetadata remoteObjectForUpload = new RemotePersistentSettingsMetadata( + settings, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThrows(AssertionError.class, remoteObjectForUpload::getUploadedMetadata); + + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + remoteObjectForUpload.setFullBlobName(new BlobPath().add(TEST_BLOB_PATH)); + UploadedMetadata uploadedMetadata = remoteObjectForUpload.getUploadedMetadata(); + assertThat(uploadedMetadata.getComponent(), is(RemotePersistentSettingsMetadata.SETTING_METADATA)); + assertThat(uploadedMetadata.getUploadedFilename(), is(remoteObjectForUpload.getFullBlobName())); + } + } + + public void testSerDe() throws IOException { + Settings settings = getSettings(); + RemotePersistentSettingsMetadata remoteObjectForUpload = new RemotePersistentSettingsMetadata( + settings, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + remoteObjectForUpload.setFullBlobName(BlobPath.cleanPath()); + assertThat(inputStream.available(), greaterThan(0)); + Settings readsettings = remoteObjectForUpload.deserialize(inputStream); + assertThat(readsettings, is(settings)); + } + } + + private Settings getSettings() { + return Settings.builder().put("random_index_setting_" + randomAlphaOfLength(3), randomAlphaOfLength(5)).build(); + } +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteTemplatesMetadataTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteTemplatesMetadataTests.java new file mode 100644 index 0000000000000..b86044003aa55 --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteTemplatesMetadataTests.java @@ -0,0 +1,242 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.cluster.ClusterModule; +import org.opensearch.cluster.metadata.IndexTemplateMetadata; +import org.opensearch.cluster.metadata.TemplatesMetadata; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.network.NetworkModule; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.indices.IndicesModule; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toList; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_CURRENT_CODEC_VERSION; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RemoteTemplatesMetadataTests extends OpenSearchTestCase { + private static final String TEST_BLOB_NAME = "/test-path/test-blob-name"; + private static final String TEST_BLOB_PATH = "test-path"; + private static final String TEST_BLOB_FILE_NAME = "test-blob-name"; + private static final long METADATA_VERSION = 3L; + private String clusterUUID; + private BlobStoreTransferService blobStoreTransferService; + private BlobStoreRepository blobStoreRepository; + private String clusterName; + private ClusterSettings clusterSettings; + private Compressor compressor; + private NamedXContentRegistry namedXContentRegistry; + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Before + public void setup() { + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + this.clusterUUID = "test-cluster-uuid"; + this.blobStoreTransferService = mock(BlobStoreTransferService.class); + this.blobStoreRepository = mock(BlobStoreRepository.class); + BlobPath blobPath = new BlobPath().add("/path"); + when(blobStoreRepository.basePath()).thenReturn(blobPath); + when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); + compressor = new NoneCompressor(); + namedXContentRegistry = new NamedXContentRegistry( + Stream.of( + NetworkModule.getNamedXContents().stream(), + IndicesModule.getNamedXContents().stream(), + ClusterModule.getNamedXWriteables().stream() + ).flatMap(Function.identity()).collect(toList()) + ); + this.clusterName = "test-cluster-name"; + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + } + + public void testClusterUUID() { + TemplatesMetadata settings = getTemplatesMetadata(); + RemoteTemplatesMetadata remoteObjectForUpload = new RemoteTemplatesMetadata( + settings, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForUpload.clusterUUID(), is(clusterUUID)); + + RemoteTemplatesMetadata remoteObjectForDownload = new RemoteTemplatesMetadata( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.clusterUUID(), is(clusterUUID)); + } + + public void testFullBlobName() { + TemplatesMetadata settings = getTemplatesMetadata(); + RemoteTemplatesMetadata remoteObjectForUpload = new RemoteTemplatesMetadata( + settings, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForUpload.getFullBlobName(), nullValue()); + + RemoteTemplatesMetadata remoteObjectForDownload = new RemoteTemplatesMetadata( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.getFullBlobName(), is(TEST_BLOB_NAME)); + } + + public void testBlobFileName() { + TemplatesMetadata settings = getTemplatesMetadata(); + RemoteTemplatesMetadata remoteObjectForUpload = new RemoteTemplatesMetadata( + settings, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForUpload.getBlobFileName(), nullValue()); + + RemoteTemplatesMetadata remoteObjectForDownload = new RemoteTemplatesMetadata( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.getBlobFileName(), is(TEST_BLOB_FILE_NAME)); + } + + public void testBlobPathTokens() { + String uploadedFile = "user/local/opensearch/settings"; + RemoteTemplatesMetadata remoteObjectForDownload = new RemoteTemplatesMetadata( + uploadedFile, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThat(remoteObjectForDownload.getBlobPathTokens(), is(new String[] { "user", "local", "opensearch", "settings" })); + } + + public void testBlobPathParameters() { + TemplatesMetadata settings = getTemplatesMetadata(); + RemoteTemplatesMetadata remoteObjectForUpload = new RemoteTemplatesMetadata( + settings, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + BlobPathParameters params = remoteObjectForUpload.getBlobPathParameters(); + assertThat(params.getPathTokens(), is(List.of(RemoteClusterStateUtils.GLOBAL_METADATA_PATH_TOKEN))); + assertThat(params.getFilePrefix(), is(RemoteTemplatesMetadata.TEMPLATES_METADATA)); + } + + public void testGenerateBlobFileName() { + TemplatesMetadata settings = getTemplatesMetadata(); + RemoteTemplatesMetadata remoteObjectForUpload = new RemoteTemplatesMetadata( + settings, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + String blobFileName = remoteObjectForUpload.generateBlobFileName(); + String[] nameTokens = blobFileName.split(RemoteClusterStateUtils.DELIMITER); + assertThat(nameTokens[0], is(RemoteTemplatesMetadata.TEMPLATES_METADATA)); + assertThat(RemoteStoreUtils.invertLong(nameTokens[1]), is(METADATA_VERSION)); + assertThat(RemoteStoreUtils.invertLong(nameTokens[2]), lessThanOrEqualTo(System.currentTimeMillis())); + assertThat(nameTokens[3], is(String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION))); + + } + + public void testGetUploadedMetadata() throws IOException { + TemplatesMetadata settings = getTemplatesMetadata(); + RemoteTemplatesMetadata remoteObjectForUpload = new RemoteTemplatesMetadata( + settings, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThrows(AssertionError.class, remoteObjectForUpload::getUploadedMetadata); + + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + remoteObjectForUpload.setFullBlobName(new BlobPath().add(TEST_BLOB_PATH)); + UploadedMetadata uploadedMetadata = remoteObjectForUpload.getUploadedMetadata(); + assertThat(uploadedMetadata.getComponent(), is(RemoteTemplatesMetadata.TEMPLATES_METADATA)); + assertThat(uploadedMetadata.getUploadedFilename(), is(remoteObjectForUpload.getFullBlobName())); + } + } + + public void testSerDe() throws IOException { + TemplatesMetadata settings = getTemplatesMetadata(); + RemoteTemplatesMetadata remoteObjectForUpload = new RemoteTemplatesMetadata( + settings, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + remoteObjectForUpload.setFullBlobName(BlobPath.cleanPath()); + assertThat(inputStream.available(), greaterThan(0)); + TemplatesMetadata readsettings = remoteObjectForUpload.deserialize(inputStream); + assertThat(readsettings, is(settings)); + } + } + + private TemplatesMetadata getTemplatesMetadata() { + return TemplatesMetadata.builder() + .put( + IndexTemplateMetadata.builder("template" + randomAlphaOfLength(3)) + .patterns(Arrays.asList("bar-*", "foo-*")) + .settings( + Settings.builder().put("index.random_index_setting_" + randomAlphaOfLength(3), randomAlphaOfLength(5)).build() + ) + .build() + ) + .build(); + } +}