Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use remote publication flag to decide which custom objects to upload #14338

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.DiffableUtils.NonDiffableValueSerializer;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.RemoteWritableEntityStore;
Expand All @@ -25,10 +27,9 @@
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* A Manager which provides APIs to upload and download attributes of ClusterState to the {@link RemoteClusterStateBlobStore}
Expand Down Expand Up @@ -126,18 +127,35 @@ public CheckedRunnable<IOException> getAsyncMetadataReadAction(
return () -> getStore(blobEntity).readAsync(blobEntity, actionListener);
}

public Map<String, ClusterState.Custom> getUpdatedCustoms(ClusterState clusterState, ClusterState previousClusterState) {
Map<String, ClusterState.Custom> updatedCustoms = new HashMap<>();
Set<String> currentCustoms = new HashSet<>(clusterState.customs().keySet());
for (Map.Entry<String, ClusterState.Custom> entry : previousClusterState.customs().entrySet()) {
if (currentCustoms.contains(entry.getKey()) && !entry.getValue().equals(clusterState.customs().get(entry.getKey()))) {
updatedCustoms.put(entry.getKey(), clusterState.customs().get(entry.getKey()));
}
currentCustoms.remove(entry.getKey());
public DiffableUtils.MapDiff<String, ClusterState.Custom, Map<String, ClusterState.Custom>> getUpdatedCustoms(
ClusterState clusterState,
ClusterState previousClusterState,
boolean isRemotePublicationEnabled,
boolean isFirstUpload
) {
if (!isRemotePublicationEnabled) {
// When isRemotePublicationEnabled is false, we do not want store any custom objects
return DiffableUtils.diff(
Collections.emptyMap(),
Collections.emptyMap(),
DiffableUtils.getStringKeySerializer(),
NonDiffableValueSerializer.getAbstractInstance()
);
}
for (String custom : currentCustoms) {
updatedCustoms.put(custom, clusterState.customs().get(custom));
if (isFirstUpload) {
// For first upload of ephemeral metadata, we want to upload all customs
return DiffableUtils.diff(
Collections.emptyMap(),
clusterState.customs(),
DiffableUtils.getStringKeySerializer(),
NonDiffableValueSerializer.getAbstractInstance()
);
}
return updatedCustoms;
return DiffableUtils.diff(
previousClusterState.customs(),
clusterState.customs(),
DiffableUtils.getStringKeySerializer(),
NonDiffableValueSerializer.getAbstractInstance()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -88,6 +89,7 @@

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2;
import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_BLOCKS;
Expand Down Expand Up @@ -159,6 +161,7 @@
private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged "
+ "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata "
+ "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]";
private final boolean isPublicationEnabled;

// ToXContent Params with gateway mode.
// We are using gateway context mode to persist all custom metadata.
Expand Down Expand Up @@ -201,6 +204,9 @@
threadPool
);
this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService);
this.isPublicationEnabled = FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL)
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
&& RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings);
}

/**
Expand All @@ -221,15 +227,15 @@
clusterState,
new ArrayList<>(clusterState.metadata().indices().values()),
emptyMap(),
clusterState.metadata().customs(),
RemoteGlobalMetadataManager.filterCustoms(clusterState.metadata().customs(), isPublicationEnabled),
soosinha marked this conversation as resolved.
Show resolved Hide resolved
true,
true,
true,
true,
true,
true,
clusterState.customs(),
true,
isPublicationEnabled,
isPublicationEnabled,
isPublicationEnabled,
isPublicationEnabled ? clusterState.customs() : Collections.emptyMap(),
isPublicationEnabled,
remoteRoutingTableService.getIndicesRouting(clusterState.getRoutingTable())
);
final RemoteClusterStateManifestInfo manifestDetails = remoteManifestManager.uploadManifest(
Expand Down Expand Up @@ -285,28 +291,17 @@
}
assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term();

final Map<String, UploadedMetadataAttribute> customsToBeDeletedFromRemote = new HashMap<>(previousManifest.getCustomMetadataMap());
final Map<String, Metadata.Custom> customsToUpload = remoteGlobalMetadataManager.getUpdatedCustoms(
clusterState,
previousClusterState
);
final Map<String, UploadedMetadataAttribute> clusterStateCustomsToBeDeleted = new HashMap<>(
boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles();

final DiffableUtils.MapDiff<String, Metadata.Custom, Map<String, Metadata.Custom>> customsDiff = remoteGlobalMetadataManager
.getCustomsDiff(clusterState, previousClusterState, firstUploadForSplitGlobalMetadata, isPublicationEnabled);
final DiffableUtils.MapDiff<String, ClusterState.Custom, Map<String, ClusterState.Custom>> clusterStateCustomsDiff =
remoteClusterStateAttributesManager.getUpdatedCustoms(clusterState, previousClusterState, isPublicationEnabled, false);
final Map<String, UploadedMetadataAttribute> allUploadedCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap());
final Map<String, UploadedMetadataAttribute> allUploadedClusterStateCustomsMap = new HashMap<>(
previousManifest.getClusterStateCustomMap()
);
final Map<String, ClusterState.Custom> clusterStateCustomsToUpload = remoteClusterStateAttributesManager.getUpdatedCustoms(
clusterState,
previousClusterState
);
final Map<String, UploadedMetadataAttribute> allUploadedCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap());
for (final String custom : clusterState.metadata().customs().keySet()) {
// remove all the customs which are present currently
customsToBeDeletedFromRemote.remove(custom);
}
final Map<String, IndexMetadata> indicesToBeDeletedFromRemote = new HashMap<>(previousClusterState.metadata().indices());
for (final String custom : clusterState.customs().keySet()) {
// remove all the custom which are present currently
clusterStateCustomsToBeDeleted.remove(custom);
}
int numIndicesUpdated = 0;
int numIndicesUnchanged = 0;
final Map<String, ClusterMetadataManifest.UploadedIndexMetadata> allUploadedIndexMetadata = previousManifest.getIndices()
Expand Down Expand Up @@ -337,42 +332,44 @@
indicesToBeDeletedFromRemote.remove(indexMetadata.getIndex().getName());
}

DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> routingTableDiff = remoteRoutingTableService
final DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> routingTableDiff = remoteRoutingTableService
.getIndicesRoutingMapDiff(previousClusterState.getRoutingTable(), clusterState.getRoutingTable());
List<IndexRoutingTable> indicesRoutingToUpload = new ArrayList<>();
final List<IndexRoutingTable> indicesRoutingToUpload = new ArrayList<>();
routingTableDiff.getUpserts().forEach((k, v) -> indicesRoutingToUpload.add(v));

UploadedMetadataResults uploadedMetadataResults;
// For migration case from codec V0 or V1 to V2, we have added null check on metadata attribute files,
// If file is empty and codec is 1 then write global metadata.
boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles();
boolean updateCoordinationMetadata = firstUploadForSplitGlobalMetadata
|| Metadata.isCoordinationMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
;
boolean updateSettingsMetadata = firstUploadForSplitGlobalMetadata
|| Metadata.isSettingsMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
boolean updateTransientSettingsMetadata = firstUploadForSplitGlobalMetadata
|| Metadata.isTransientSettingsMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
boolean updateTransientSettingsMetadata = Metadata.isTransientSettingsMetadataEqual(
previousClusterState.metadata(),
clusterState.metadata()
) == false;
boolean updateTemplatesMetadata = firstUploadForSplitGlobalMetadata
|| Metadata.isTemplatesMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
// ToDo: check if these needs to be updated or not
final boolean updateDiscoveryNodes = clusterState.getNodes().delta(previousClusterState.getNodes()).hasChanges();
final boolean updateClusterBlocks = !clusterState.blocks().equals(previousClusterState.blocks());
final boolean updateHashesOfConsistentSettings = firstUploadForSplitGlobalMetadata

final boolean updateDiscoveryNodes = isPublicationEnabled
&& clusterState.getNodes().delta(previousClusterState.getNodes()).hasChanges();
final boolean updateClusterBlocks = isPublicationEnabled && !clusterState.blocks().equals(previousClusterState.blocks());
final boolean updateHashesOfConsistentSettings = isPublicationEnabled
|| Metadata.isHashesOfConsistentSettingsEqual(previousClusterState.metadata(), clusterState.metadata()) == false;

uploadedMetadataResults = writeMetadataInParallel(
clusterState,
toUpload,
prevIndexMetadataByName,
firstUploadForSplitGlobalMetadata ? clusterState.metadata().customs() : customsToUpload,
customsDiff.getUpserts(),
updateCoordinationMetadata,
updateSettingsMetadata,
updateTemplatesMetadata,
updateDiscoveryNodes,
updateClusterBlocks,
updateTransientSettingsMetadata,
clusterStateCustomsToUpload,
clusterStateCustomsDiff.getUpserts(),
updateHashesOfConsistentSettings,
indicesRoutingToUpload
);
Expand All @@ -382,10 +379,11 @@
uploadedIndexMetadata -> allUploadedIndexMetadata.put(uploadedIndexMetadata.getIndexName(), uploadedIndexMetadata)
);
allUploadedCustomMap.putAll(uploadedMetadataResults.uploadedCustomMetadataMap);
allUploadedClusterStateCustomsMap.putAll(uploadedMetadataResults.uploadedClusterStateCustomMetadataMap);
// remove the data for removed custom/indices
customsToBeDeletedFromRemote.keySet().forEach(allUploadedCustomMap::remove);
customsDiff.getDeletes().forEach(allUploadedCustomMap::remove);
indicesToBeDeletedFromRemote.keySet().forEach(allUploadedIndexMetadata::remove);
clusterStateCustomsToBeDeleted.keySet().forEach(allUploadedCustomMap::remove);
clusterStateCustomsDiff.getDeletes().forEach(allUploadedClusterStateCustomsMap::remove);

if (!updateCoordinationMetadata) {
uploadedMetadataResults.uploadedCoordinationMetadata = previousManifest.getCoordinationMetadata();
Expand All @@ -399,31 +397,24 @@
if (!updateTemplatesMetadata) {
uploadedMetadataResults.uploadedTemplatesMetadata = previousManifest.getTemplatesMetadata();
}
if (!updateDiscoveryNodes && !firstUploadForSplitGlobalMetadata) {
if (!updateDiscoveryNodes) {
uploadedMetadataResults.uploadedDiscoveryNodes = previousManifest.getDiscoveryNodesMetadata();
}
if (!updateClusterBlocks && !firstUploadForSplitGlobalMetadata) {
if (!updateClusterBlocks) {
uploadedMetadataResults.uploadedClusterBlocks = previousManifest.getClusterBlocksMetadata();
}
if (!updateHashesOfConsistentSettings && !firstUploadForSplitGlobalMetadata) {
if (!updateHashesOfConsistentSettings) {
uploadedMetadataResults.uploadedHashesOfConsistentSettings = previousManifest.getHashesOfConsistentSettings();
}
if (!firstUploadForSplitGlobalMetadata && customsToUpload.isEmpty()) {
uploadedMetadataResults.uploadedCustomMetadataMap = previousManifest.getCustomMetadataMap();
}
if (!firstUploadForSplitGlobalMetadata && clusterStateCustomsToUpload.isEmpty()) {
uploadedMetadataResults.uploadedClusterStateCustomMetadataMap = previousManifest.getClusterStateCustomMap();
}
uploadedMetadataResults.uploadedCustomMetadataMap = allUploadedCustomMap;
uploadedMetadataResults.uploadedClusterStateCustomMetadataMap = allUploadedClusterStateCustomsMap;
uploadedMetadataResults.uploadedIndexMetadata = new ArrayList<>(allUploadedIndexMetadata.values());

List<ClusterMetadataManifest.UploadedIndexMetadata> allUploadedIndicesRouting = new ArrayList<>();
allUploadedIndicesRouting = remoteRoutingTableService.getAllUploadedIndicesRouting(
uploadedMetadataResults.uploadedIndicesRoutingMetadata = remoteRoutingTableService.getAllUploadedIndicesRouting(
previousManifest,
uploadedMetadataResults.uploadedIndicesRoutingMetadata,
routingTableDiff.getDeletes()
);
uploadedMetadataResults.uploadedIndicesRoutingMetadata = allUploadedIndicesRouting;

final RemoteClusterStateManifestInfo manifestDetails = remoteManifestManager.uploadManifest(
clusterState,
Expand All @@ -448,7 +439,7 @@
updateCoordinationMetadata,
updateSettingsMetadata,
updateTemplatesMetadata,
customsToUpload.size(),
customsDiff.getUpserts().size(),
indicesRoutingToUpload.size()
);
if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
Expand All @@ -464,7 +455,7 @@
updateCoordinationMetadata,
updateSettingsMetadata,
updateTemplatesMetadata,
customsToUpload.size()
customsDiff.getUpserts().size()

Check warning on line 458 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L458

Added line #L458 was not covered by tests
);
} else {
logger.info("{}; {}", clusterStateUploadTimeMessage, metadataUpdateMessage);
Expand All @@ -479,7 +470,7 @@
updateCoordinationMetadata,
updateSettingsMetadata,
updateTemplatesMetadata,
customsToUpload.size()
customsDiff.getUpserts().size()
);
}
return manifestDetails;
Expand Down
Loading
Loading