Skip to content

Commit

Permalink
Create Remote Object managers and use them in orchestration from Remo…
Browse files Browse the repository at this point in the history
…teClusterStateService

Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Jun 8, 2024
1 parent 49282c1 commit e217065
Show file tree
Hide file tree
Showing 21 changed files with 1,760 additions and 1,152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.CUSTOM_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_FILE_PREFIX;
import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA;
import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_METADATA;
import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA;
import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteClusterStateServiceIT extends RemoteStoreBaseIntegTestCase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import static org.opensearch.cluster.metadata.Metadata.CLUSTER_READ_ONLY_BLOCK;
import static org.opensearch.cluster.metadata.Metadata.SETTING_READ_ONLY_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.encodeString;
import static org.opensearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -326,9 +327,7 @@ public void testFullClusterRestoreManifestFilePointsToInvalidIndexMetadataPathTh
// Step - 3 Delete index metadata file in remote
try {
Files.move(
segmentRepoPath.resolve(
RemoteClusterStateService.encodeString(clusterName) + "/cluster-state/" + prevClusterUUID + "/index"
),
segmentRepoPath.resolve(encodeString(clusterName) + "/cluster-state/" + prevClusterUUID + "/index"),
segmentRepoPath.resolve("cluster-state/")
);
} catch (IOException e) {
Expand All @@ -354,10 +353,7 @@ public void testRemoteStateFullRestart() throws Exception {
try {
Files.move(
segmentRepoPath.resolve(
RemoteClusterStateService.encodeString(clusterService().state().getClusterName().value())
+ "/cluster-state/"
+ prevClusterUUID
+ "/manifest"
encodeString(clusterService().state().getClusterName().value()) + "/cluster-state/" + prevClusterUUID + "/manifest"
),
segmentRepoPath.resolve("cluster-state/")
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@
import java.util.Set;
import java.util.function.Predicate;

import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING;
import static org.opensearch.gateway.remote.RemoteIndexMetadataManager.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING;
import static org.opensearch.gateway.remote.RemoteManifestManager.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING;

/**
* Encapsulates all valid cluster level settings.
*
Expand Down Expand Up @@ -715,9 +719,9 @@ public void apply(Settings value, Settings current, Settings previous) {
// Remote cluster state settings
RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING,
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
Expand Down
23 changes: 16 additions & 7 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.opensearch.env.NodeMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.RemoteUploadDetails;
import org.opensearch.index.recovery.RemoteStoreRestoreService;
import org.opensearch.index.recovery.RemoteStoreRestoreService.RemoteRestoreResult;
import org.opensearch.node.Node;
Expand Down Expand Up @@ -693,7 +694,7 @@ public void setCurrentTerm(long currentTerm) {
@Override
public void setLastAcceptedState(ClusterState clusterState) {
try {
final ClusterMetadataManifest manifest;
final RemoteUploadDetails manifestDetails;
if (shouldWriteFullClusterState(clusterState)) {
final Optional<ClusterMetadataManifest> latestManifest = remoteClusterStateService.getLatestClusterMetadataManifest(
clusterState.getClusterName().value(),
Expand All @@ -711,14 +712,21 @@ public void setLastAcceptedState(ClusterState clusterState) {
clusterState.metadata().clusterUUID()
);
}
manifest = remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID);
manifestDetails = remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID);
} else {
assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == true
: "Previous manifest and previous ClusterState are not in sync";
manifest = remoteClusterStateService.writeIncrementalMetadata(lastAcceptedState, clusterState, lastAcceptedManifest);
manifestDetails = remoteClusterStateService.writeIncrementalMetadata(
lastAcceptedState,
clusterState,
lastAcceptedManifest
);
}
assert verifyManifestAndClusterState(manifest, clusterState) == true : "Manifest and ClusterState are not in sync";
lastAcceptedManifest = manifest;
assert verifyManifestAndClusterState(
Objects.requireNonNull(manifestDetails).getClusterMetadataManifest(),
clusterState
) == true : "Manifest and ClusterState are not in sync";
lastAcceptedManifest = manifestDetails.getClusterMetadataManifest();
lastAcceptedState = clusterState;
} catch (Exception e) {
remoteClusterStateService.writeMetadataFailed();
Expand Down Expand Up @@ -767,11 +775,12 @@ public void markLastAcceptedStateAsCommitted() {
metadataBuilder.clusterUUIDCommitted(true);
clusterState = ClusterState.builder(lastAcceptedState).metadata(metadataBuilder).build();
}
final ClusterMetadataManifest committedManifest = remoteClusterStateService.markLastStateAsCommitted(
final RemoteUploadDetails committedManifestDetails = remoteClusterStateService.markLastStateAsCommitted(
clusterState,
lastAcceptedManifest
);
lastAcceptedManifest = committedManifest;
assert committedManifestDetails != null;
lastAcceptedManifest = committedManifestDetails.getClusterMetadataManifest();
lastAcceptedState = clusterState;
} catch (Exception e) {
handleExceptionOnWrite(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_FORMAT;
import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_PATH_TOKEN;
import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_METADATA_FORMAT;
import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_PATH_TOKEN;
import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX;
import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_PATH_TOKEN;
import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST;

/**
* A Manager which provides APIs to clean up stale cluster state files and runs an async stale cleanup task
Expand All @@ -50,7 +45,7 @@ public class RemoteClusterStateCleanupManager implements Closeable {

public static final int RETAINED_MANIFESTS = 10;
public static final int SKIP_CLEANUP_STATE_CHANGES = 10;
public static final TimeValue CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT = TimeValue.timeValueMinutes(5);
public static final TimeValue CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT = TimeValue.timeValueSeconds(15);
public static final TimeValue CLUSTER_STATE_CLEANUP_INTERVAL_MINIMUM = TimeValue.MINUS_ONE;

/**
Expand All @@ -70,7 +65,7 @@ public class RemoteClusterStateCleanupManager implements Closeable {
private BlobStoreTransferService blobStoreTransferService;
private TimeValue staleFileCleanupInterval;
private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false);
private volatile AsyncStaleFileDeletion staleFileDeletionTask;
private AsyncStaleFileDeletion staleFileDeletionTask;
private long lastCleanupAttemptStateVersion;
private final ThreadPool threadpool;
private final ClusterApplierService clusterApplierService;
Expand Down Expand Up @@ -150,12 +145,7 @@ void cleanUpStaleFiles() {

private void addStaleGlobalMetadataPath(String fileName, Set<String> filesToKeep, Set<String> staleGlobalMetadataPaths) {
if (!filesToKeep.contains(fileName)) {
String[] splitPath = fileName.split("/");
staleGlobalMetadataPaths.add(
new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName(
splitPath[splitPath.length - 1]
)
);
staleGlobalMetadataPaths.add(fileName);
}
}

Expand All @@ -172,15 +162,24 @@ void deleteClusterMetadata(
Set<String> staleIndexMetadataPaths = new HashSet<>();
Set<String> staleGlobalMetadataPaths = new HashSet<>();
activeManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
clusterName,
clusterUUID,
blobMetadata.name()
);
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.getRemoteManifestManager()
.fetchRemoteClusterMetadataManifest(clusterName, clusterUUID, blobMetadata.name());
clusterMetadataManifest.getIndices()
.forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename()));
.forEach(
uploadedIndexMetadata -> filesToKeep.add(
RemoteClusterStateUtils.getFormattedFileName(
uploadedIndexMetadata.getUploadedFilename(),
clusterMetadataManifest.getCodecVersion()
)
)
);
if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) {
filesToKeep.add(clusterMetadataManifest.getGlobalMetadataFileName());
filesToKeep.add(
RemoteClusterStateUtils.getFormattedFileName(
clusterMetadataManifest.getGlobalMetadataFileName(),
clusterMetadataManifest.getCodecVersion()
)
);
} else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) {
filesToKeep.add(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename());
filesToKeep.add(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename());
Expand All @@ -191,14 +190,21 @@ void deleteClusterMetadata(
}
});
staleManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
clusterName,
clusterUUID,
blobMetadata.name()
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.getRemoteManifestManager()
.fetchRemoteClusterMetadataManifest(clusterName, clusterUUID, blobMetadata.name());
staleManifestPaths.add(
remoteClusterStateService.getRemoteManifestManager().getManifestFolderPath(clusterName, clusterUUID).buildAsString()
+ blobMetadata.name()
);
staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name());
if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) {
addStaleGlobalMetadataPath(clusterMetadataManifest.getGlobalMetadataFileName(), filesToKeep, staleGlobalMetadataPaths);
addStaleGlobalMetadataPath(
RemoteClusterStateUtils.getFormattedFileName(
clusterMetadataManifest.getGlobalMetadataFileName(),
clusterMetadataManifest.getCodecVersion()
),
filesToKeep,
staleGlobalMetadataPaths
);
} else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) {
addStaleGlobalMetadataPath(
clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename(),
Expand All @@ -225,8 +231,10 @@ void deleteClusterMetadata(
clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) {
staleIndexMetadataPaths.add(
new BlobPath().add(INDEX_PATH_TOKEN).add(uploadedIndexMetadata.getIndexUUID()).buildAsString()
+ INDEX_METADATA_FORMAT.blobName(uploadedIndexMetadata.getUploadedFilename())
RemoteClusterStateUtils.getFormattedFileName(
uploadedIndexMetadata.getUploadedFilename(),
clusterMetadataManifest.getCodecVersion()
)
);
}
});
Expand All @@ -237,9 +245,9 @@ void deleteClusterMetadata(
return;
}

deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths));
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths));
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths));
deleteStalePaths(new ArrayList<>(staleGlobalMetadataPaths));
deleteStalePaths(new ArrayList<>(staleIndexMetadataPaths));
deleteStalePaths(new ArrayList<>(staleManifestPaths));
} catch (IllegalStateException e) {
logger.error("Error while fetching Remote Cluster Metadata manifests", e);
} catch (IOException e) {
Expand Down Expand Up @@ -267,8 +275,8 @@ void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int mani
try {
getBlobStoreTransferService().listAllInSortedOrderAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteClusterStateService.getManifestFolderPath(clusterName, clusterUUID),
MANIFEST_FILE_PREFIX,
remoteClusterStateService.getRemoteManifestManager().getManifestFolderPath(clusterName, clusterUUID),
MANIFEST,
Integer.MAX_VALUE,
new ActionListener<>() {
@Override
Expand Down Expand Up @@ -312,7 +320,11 @@ void deleteStaleUUIDsClusterMetadata(String clusterName, List<String> clusterUUI
clusterUUIDs.forEach(
clusterUUID -> getBlobStoreTransferService().deleteAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID),
RemoteClusterStateUtils.getCusterMetadataBasePath(
remoteClusterStateService.getBlobStoreRepository(),
clusterName,
clusterUUID
),
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
Expand All @@ -336,12 +348,9 @@ public void onFailure(Exception e) {
}

// package private for testing
void deleteStalePaths(String clusterName, String clusterUUID, List<String> stalePaths) throws IOException {
void deleteStalePaths(List<String> stalePaths) throws IOException {
logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths));
getBlobStoreTransferService().deleteBlobs(
remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID),
stalePaths
);
getBlobStoreTransferService().deleteBlobs(BlobPath.cleanPath(), stalePaths);
}

/**
Expand Down
Loading

0 comments on commit e217065

Please sign in to comment.