Skip to content

Commit

Permalink
Remove timeouts from CreateIndexClusterStateUpdateRequest (elastic#…
Browse files Browse the repository at this point in the history
…113366)

This class is mostly used to carry information during the process of
computing a cluster state update which creates an index, for which the
`masterNodeTimeout` and `ackTimeout` fields are not meaningful. Setting
these fields in those contexts is pointless, but leaving them as `null`
makes it harder to reason about null-propagation. This commit removes
these fields and replaces them with method arguments in the few places
where they actually make sense.
  • Loading branch information
DaveCTurner authored Sep 24, 2024
1 parent 436a5e6 commit 717cd65
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ private CreateIndexClusterStateUpdateRequest buildUpdateRequest(String indexName
request.cause(),
indexName,
request.index()
).ackTimeout(request.ackTimeout()).performReroute(false).masterNodeTimeout(request.masterNodeTimeout());
).performReroute(false);
logger.debug("Auto-creating index {}", indexName);
return updateRequest;
}
Expand All @@ -367,7 +367,7 @@ private CreateIndexClusterStateUpdateRequest buildSystemIndexUpdateRequest(Strin
request.cause(),
concreteIndexName,
request.index()
).ackTimeout(request.ackTimeout()).masterNodeTimeout(request.masterNodeTimeout()).performReroute(false);
).performReroute(false);

updateRequest.waitForActiveShards(ActiveShardCount.ALL);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.shrink.ResizeType;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -25,7 +24,7 @@
/**
* Cluster state update request that allows to create an index
*/
public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequest<CreateIndexClusterStateUpdateRequest> {
public class CreateIndexClusterStateUpdateRequest {

private final String cause;
private final String index;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ protected void masterOperation(
}

createIndexService.createIndex(
request.masterNodeTimeout(),
request.ackTimeout(),
request.ackTimeout(),
updateRequest,
listener.map(response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName))
);
Expand All @@ -166,9 +169,7 @@ private CreateIndexClusterStateUpdateRequest buildUpdateRequest(
alias.isHidden(true);
}
}).collect(Collectors.toSet());
return new CreateIndexClusterStateUpdateRequest(cause, indexName, request.index()).ackTimeout(request.ackTimeout())
.masterNodeTimeout(request.masterNodeTimeout())
.settings(request.settings())
return new CreateIndexClusterStateUpdateRequest(cause, indexName, request.index()).settings(request.settings())
.mappings(request.mappings())
.aliases(aliases)
.nameResolvedInstant(nameResolvedAt)
Expand Down Expand Up @@ -196,15 +197,7 @@ private static CreateIndexClusterStateUpdateRequest buildSystemIndexUpdateReques
);
}

final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(
cause,
descriptor.getPrimaryIndex(),
request.index()
);

return updateRequest.ackTimeout(request.ackTimeout())
.masterNodeTimeout(request.masterNodeTimeout())
.aliases(aliases)
return new CreateIndexClusterStateUpdateRequest(cause, descriptor.getPrimaryIndex(), request.index()).aliases(aliases)
.waitForActiveShards(ActiveShardCount.ALL)
.mappings(descriptor.getMappings())
.settings(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,11 +548,7 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(
if (settings != null) {
b.put(settings);
}
return new CreateIndexClusterStateUpdateRequest(cause, targetIndexName, providedIndexName).ackTimeout(
createIndexRequest.ackTimeout()
)
.masterNodeTimeout(createIndexRequest.masterNodeTimeout())
.settings(b.build())
return new CreateIndexClusterStateUpdateRequest(cause, targetIndexName, providedIndexName).settings(b.build())
.aliases(createIndexRequest.aliases())
.waitForActiveShards(ActiveShardCount.NONE) // not waiting for shards here, will wait on the alias switch operation
.mappings(createIndexRequest.mappings())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ protected void masterOperation(
return;
}
createIndexService.createIndex(
resizeRequest.masterNodeTimeout(),
resizeRequest.ackTimeout(),
resizeRequest.ackTimeout(),
updateRequest,
delegatedListener.map(
response -> new CreateIndexResponse(
Expand Down Expand Up @@ -234,8 +237,6 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(
// mappings are updated on the node when creating in the shards, this prevents race-conditions since all mapping must be
// applied once we took the snapshot and if somebody messes things up and switches the index read/write and adds docs we
// miss the mappings for everything is corrupted and hard to debug
.ackTimeout(targetIndex.ackTimeout())
.masterNodeTimeout(targetIndex.masterNodeTimeout())
.settings(targetIndex.settings())
.aliases(targetIndex.aliases())
.waitForActiveShards(targetIndex.waitForActiveShards())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.PathUtils;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
Expand Down Expand Up @@ -254,12 +255,23 @@ public static void validateIndexOrAliasName(String index, BiFunction<String, Str
* the timeout, then {@link ShardsAcknowledgedResponse#isShardsAcknowledged()} will
* return true, otherwise if the operation timed out, then it will return false.
*
* @param masterNodeTimeout timeout on cluster state update in pending task queue
* @param ackTimeout timeout on waiting for all nodes to ack the cluster state update
* @param waitForActiveShardsTimeout timeout for waiting for the {@link ActiveShardCount} specified in
* {@link CreateIndexClusterStateUpdateRequest#waitForActiveShards()} to be satisfied.
* May also be {@code null}, in which case it waits forever.
* @param request the index creation cluster state update request
* @param listener the listener on which to send the index creation cluster state update response
*/
public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ShardsAcknowledgedResponse> listener) {
public void createIndex(
final TimeValue masterNodeTimeout,
final TimeValue ackTimeout,
@Nullable final TimeValue waitForActiveShardsTimeout,
final CreateIndexClusterStateUpdateRequest request,
final ActionListener<ShardsAcknowledgedResponse> listener
) {
logger.trace("createIndex[{}]", request);
onlyCreateIndex(request, listener.delegateFailureAndWrap((delegate, response) -> {
onlyCreateIndex(masterNodeTimeout, ackTimeout, request, listener.delegateFailureAndWrap((delegate, response) -> {
if (response.isAcknowledged()) {
logger.trace(
"[{}] index creation acknowledged, waiting for active shards [{}]",
Expand All @@ -270,7 +282,7 @@ public void createIndex(final CreateIndexClusterStateUpdateRequest request, fina
clusterService,
new String[] { request.index() },
request.waitForActiveShards(),
request.ackTimeout(),
waitForActiveShardsTimeout,
delegate.map(shardsAcknowledged -> {
if (shardsAcknowledged == false) {
logger.debug(
Expand All @@ -290,18 +302,18 @@ public void createIndex(final CreateIndexClusterStateUpdateRequest request, fina
}));
}

private void onlyCreateIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<AcknowledgedResponse> listener) {
private void onlyCreateIndex(
final TimeValue masterNodeTimeout,
final TimeValue ackTimeout,
final CreateIndexClusterStateUpdateRequest request,
final ActionListener<AcknowledgedResponse> listener
) {
normalizeRequestSetting(request);

var delegate = new AllocationActionListener<>(listener, threadPool.getThreadContext());
submitUnbatchedTask(
"create-index [" + request.index() + "], cause [" + request.cause() + "]",
new AckedClusterStateUpdateTask(
Priority.URGENT,
request.masterNodeTimeout(),
request.ackTimeout(),
delegate.clusterStateUpdate()
) {
new AckedClusterStateUpdateTask(Priority.URGENT, masterNodeTimeout, ackTimeout, delegate.clusterStateUpdate()) {

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.ShardsAcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
Expand Down Expand Up @@ -500,7 +501,13 @@ private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<
createRequest.waitForActiveShards(ActiveShardCount.ALL)
.mappings(migrationInfo.getMappings())
.settings(Objects.requireNonNullElse(settingsBuilder.build(), Settings.EMPTY));
metadataCreateIndexService.createIndex(createRequest, listener);
metadataCreateIndexService.createIndex(
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
TimeValue.ZERO,
null,
createRequest,
listener
);
}

private CheckedBiConsumer<ActionListener<BulkByScrollResponse>, AcknowledgedResponse, Exception> setAliasAndRemoveOldIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -147,7 +148,13 @@ public void testSystemIndicesCannotBeCreatedUnhidden() {

ArgumentCaptor<Exception> exceptionArgumentCaptor = ArgumentCaptor.forClass(Exception.class);
verify(mockListener, times(0)).onResponse(any());
verify(metadataCreateIndexService, times(0)).createIndex(any(), any());
verify(metadataCreateIndexService, times(0)).createIndex(
any(TimeValue.class),
any(TimeValue.class),
any(TimeValue.class),
any(),
any()
);
verify(mockListener, times(1)).onFailure(exceptionArgumentCaptor.capture());

Exception e = exceptionArgumentCaptor.getValue();
Expand All @@ -167,7 +174,13 @@ public void testSystemIndicesCreatedHiddenByDefault() {
CreateIndexClusterStateUpdateRequest.class
);
verify(mockListener, times(0)).onFailure(any());
verify(metadataCreateIndexService, times(1)).createIndex(createRequestArgumentCaptor.capture(), any());
verify(metadataCreateIndexService, times(1)).createIndex(
any(TimeValue.class),
any(TimeValue.class),
any(TimeValue.class),
createRequestArgumentCaptor.capture(),
any()
);

CreateIndexClusterStateUpdateRequest processedRequest = createRequestArgumentCaptor.getValue();
assertTrue(processedRequest.settings().getAsBoolean(SETTING_INDEX_HIDDEN, false));
Expand All @@ -187,7 +200,13 @@ public void testSystemAliasCreatedHiddenByDefault() {
CreateIndexClusterStateUpdateRequest.class
);
verify(mockListener, times(0)).onFailure(any());
verify(metadataCreateIndexService, times(1)).createIndex(createRequestArgumentCaptor.capture(), any());
verify(metadataCreateIndexService, times(1)).createIndex(
any(TimeValue.class),
any(TimeValue.class),
any(TimeValue.class),
createRequestArgumentCaptor.capture(),
any()
);

CreateIndexClusterStateUpdateRequest processedRequest = createRequestArgumentCaptor.getValue();
assertTrue(processedRequest.aliases().contains(new Alias(SYSTEM_ALIAS_NAME).isHidden(true)));
Expand Down

0 comments on commit 717cd65

Please sign in to comment.