Skip to content

Commit

Permalink
[Failure store] Add failure index retrieval to IndexAbstraction (el…
Browse files Browse the repository at this point in the history
  • Loading branch information
gmarouli authored Jan 17, 2025
1 parent 4a2abab commit 2ee7ca4
Show file tree
Hide file tree
Showing 61 changed files with 596 additions and 442 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1925,7 +1925,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
DataStream original = currentState.getMetadata().dataStreams().get(dataStreamName);
DataStream broken = original.copy()
.setBackingIndices(
original.getBackingIndices()
original.getDataComponent()
.copy()
.setIndices(
List.of(new Index(original.getIndices().get(0).getName(), "broken"), original.getIndices().get(1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void setup() throws Exception {
dsBackingIndexName = dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName();
otherDsBackingIndexName = dataStreamInfos.get(1).getDataStream().getIndices().get(0).getName();
fsBackingIndexName = dataStreamInfos.get(2).getDataStream().getIndices().get(0).getName();
fsFailureIndexName = dataStreamInfos.get(2).getDataStream().getFailureIndices().getIndices().get(0).getName();
fsFailureIndexName = dataStreamInfos.get(2).getDataStream().getFailureIndices().get(0).getName();

// Will be used in some tests, to test renaming while restoring a snapshot:
ds2BackingIndexName = dsBackingIndexName.replace("-ds-", "-ds2-");
Expand Down Expand Up @@ -279,7 +279,7 @@ public void testSnapshotAndRestoreAllDataStreamsInPlace() throws Exception {
assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(otherDsBackingIndexName));
backingIndices = dataStreamInfos.get(2).getDataStream().getIndices();
assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(fsBackingIndexName));
List<Index> failureIndices = dataStreamInfos.get(2).getDataStream().getFailureIndices().getIndices();
List<Index> failureIndices = dataStreamInfos.get(2).getDataStream().getFailureIndices();
assertThat(failureIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(fsFailureIndexName));
}

Expand Down Expand Up @@ -375,7 +375,7 @@ public void testFailureStoreSnapshotAndRestore() throws Exception {
assertEquals(1, dataStreamInfos.size());
assertEquals(1, dataStreamInfos.get(0).getDataStream().getIndices().size());
assertEquals(fsBackingIndexName, dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName());
assertEquals(fsFailureIndexName, dataStreamInfos.get(0).getDataStream().getFailureIndices().getIndices().get(0).getName());
assertEquals(fsFailureIndexName, dataStreamInfos.get(0).getDataStream().getFailureIndices().get(0).getName());
}
{
// With rename pattern
Expand All @@ -394,7 +394,7 @@ public void testFailureStoreSnapshotAndRestore() throws Exception {
assertEquals(1, dataStreamInfos.size());
assertEquals(1, dataStreamInfos.get(0).getDataStream().getIndices().size());
assertEquals(fs2BackingIndexName, dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName());
assertEquals(fs2FailureIndexName, dataStreamInfos.get(0).getDataStream().getFailureIndices().getIndices().get(0).getName());
assertEquals(fs2FailureIndexName, dataStreamInfos.get(0).getDataStream().getFailureIndices().get(0).getName());
}
}

Expand Down Expand Up @@ -587,8 +587,8 @@ public void testSnapshotAndRestoreAll() throws Exception {
assertEquals(otherDsBackingIndexName, dataStreamInfos.get(1).getDataStream().getIndices().get(0).getName());
assertEquals(1, dataStreamInfos.get(2).getDataStream().getIndices().size());
assertEquals(fsBackingIndexName, dataStreamInfos.get(2).getDataStream().getIndices().get(0).getName());
assertEquals(1, dataStreamInfos.get(2).getDataStream().getFailureIndices().getIndices().size());
assertEquals(fsFailureIndexName, dataStreamInfos.get(2).getDataStream().getFailureIndices().getIndices().get(0).getName());
assertEquals(1, dataStreamInfos.get(2).getDataStream().getFailureIndices().size());
assertEquals(fsFailureIndexName, dataStreamInfos.get(2).getDataStream().getFailureIndices().get(0).getName());

GetAliasesResponse getAliasesResponse = client.admin()
.indices()
Expand Down Expand Up @@ -659,7 +659,7 @@ public void testSnapshotAndRestoreIncludeAliasesFalse() throws Exception {
assertEquals(1, dataStreamInfos.get(2).getDataStream().getIndices().size());
assertEquals(fsBackingIndexName, dataStreamInfos.get(2).getDataStream().getIndices().get(0).getName());
assertEquals(1, dataStreamInfos.get(2).getDataStream().getIndices().size());
assertEquals(fsFailureIndexName, dataStreamInfos.get(2).getDataStream().getFailureIndices().getIndices().get(0).getName());
assertEquals(fsFailureIndexName, dataStreamInfos.get(2).getDataStream().getFailureIndices().get(0).getName());

GetAliasesResponse getAliasesResponse = client.admin()
.indices()
Expand Down Expand Up @@ -1257,8 +1257,8 @@ public void testExcludeDSFromSnapshotWhenExcludingAnyOfItsIndices() {
assertThat(restoreSnapshotResponse.failedShards(), is(0));

GetDataStreamAction.Response.DataStreamInfo dataStream = getDataStreamInfo(dataStreamName).getFirst();
assertThat(dataStream.getDataStream().getBackingIndices().getIndices(), not(empty()));
assertThat(dataStream.getDataStream().getFailureIndices().getIndices(), empty());
assertThat(dataStream.getDataStream().getDataComponent().getIndices(), not(empty()));
assertThat(dataStream.getDataStream().getFailureIndices(), empty());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void testRolloverIsExecutedOnce() throws ExecutionException, InterruptedE
// Verify that the data stream is marked for rollover and that it has currently one index
DataStream dataStream = getDataStream(dataStreamName);
assertThat(dataStream.rolloverOnWrite(), equalTo(true));
assertThat(dataStream.getBackingIndices().getIndices().size(), equalTo(1));
assertThat(dataStream.getDataComponent().getIndices().size(), equalTo(1));

// Introduce a disruption to the master node that should delay the rollover execution
final var barrier = new CyclicBarrier(2);
Expand Down Expand Up @@ -107,7 +107,7 @@ public void onFailure(Exception e) {
// Verify that the rollover has happened once
dataStream = getDataStream(dataStreamName);
assertThat(dataStream.rolloverOnWrite(), equalTo(false));
assertThat(dataStream.getBackingIndices().getIndices().size(), equalTo(2));
assertThat(dataStream.getDataComponent().getIndices().size(), equalTo(2));
}

private DataStream getDataStream(String dataStreamName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ private void assertDataStreamBackingIndicesModes(final String dataStreamName, fi
final GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
final DataStream dataStream = getDataStreamResponse.getDataStreams().get(0).getDataStream();
final DataStream.DataStreamIndices backingIndices = dataStream.getBackingIndices();
final DataStream.DataStreamIndices backingIndices = dataStream.getDataComponent();
final Iterator<IndexMode> indexModesIterator = modes.iterator();
assertThat(backingIndices.getIndices().size(), Matchers.equalTo(modes.size()));
for (final Index index : backingIndices.getIndices()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,7 @@ public void testLifecycleAppliedToFailureStore() throws Exception {
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
assertThat(backingIndices.size(), equalTo(1));
List<Index> failureIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getFailureIndices().getIndices();
List<Index> failureIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getFailureIndices();
assertThat(failureIndices.size(), equalTo(2));
});

Expand Down Expand Up @@ -1129,7 +1129,7 @@ public void testLifecycleAppliedToFailureStore() throws Exception {
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
assertThat(backingIndices.size(), equalTo(1));
List<Index> failureIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getFailureIndices().getIndices();
List<Index> failureIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getFailureIndices();
assertThat(failureIndices.size(), equalTo(1));
assertThat(failureIndices.get(0).getName(), equalTo(secondGenerationIndex));
});
Expand All @@ -1156,14 +1156,7 @@ private static List<String> getFailureIndices(String dataStreamName) {
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
return getDataStreamResponse.getDataStreams()
.get(0)
.getDataStream()
.getFailureIndices()
.getIndices()
.stream()
.map(Index::getName)
.toList();
return getDataStreamResponse.getDataStreams().get(0).getDataStream().getFailureIndices().stream().map(Index::getName).toList();
}

static void indexDocs(String dataStream, int numDocs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ protected DataStreamsStatsAction.DataStreamShardStats readShardResult(StreamInpu
if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM) {
DataStream dataStream = (DataStream) indexAbstraction;
AggregatedStats stats = aggregatedDataStreamsStats.computeIfAbsent(dataStream.getName(), s -> new AggregatedStats());
dataStream.getBackingIndices().getIndices().stream().map(Index::getName).forEach(index -> {
dataStream.getIndices().stream().map(Index::getName).forEach(index -> {
stats.backingIndices.add(index);
allBackingIndices.add(index);
});
dataStream.getFailureIndices().getIndices().stream().map(Index::getName).forEach(index -> {
dataStream.getFailureIndices().stream().map(Index::getName).forEach(index -> {
stats.backingIndices.add(index);
allBackingIndices.add(index);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ static ClusterState removeDataStream(
DataStream dataStream = currentState.metadata().dataStreams().get(dataStreamName);
assert dataStream != null;
backingIndicesToRemove.addAll(dataStream.getIndices());
backingIndicesToRemove.addAll(dataStream.getFailureIndices().getIndices());
backingIndicesToRemove.addAll(dataStream.getFailureIndices());
}

// first delete the data streams and then the indices:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ static GetDataStreamAction.Response innerOperation(
Map<Index, IndexProperties> backingIndicesSettingsValues = new HashMap<>();
Metadata metadata = state.getMetadata();
collectIndexSettingsValues(dataStream, backingIndicesSettingsValues, metadata, dataStream.getIndices());
if (DataStream.isFailureStoreFeatureFlagEnabled() && dataStream.getFailureIndices().getIndices().isEmpty() == false) {
collectIndexSettingsValues(dataStream, backingIndicesSettingsValues, metadata, dataStream.getFailureIndices().getIndices());
if (DataStream.isFailureStoreFeatureFlagEnabled() && dataStream.getFailureIndices().isEmpty() == false) {
collectIndexSettingsValues(dataStream, backingIndicesSettingsValues, metadata, dataStream.getFailureIndices());
}

GetDataStreamAction.Response.TimeSeries timeSeries = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -762,10 +762,8 @@ static List<Index> getTargetIndices(
targetIndices.add(index);
}
}
if (withFailureStore
&& DataStream.isFailureStoreFeatureFlagEnabled()
&& dataStream.getFailureIndices().getIndices().isEmpty() == false) {
for (Index index : dataStream.getFailureIndices().getIndices()) {
if (withFailureStore && DataStream.isFailureStoreFeatureFlagEnabled() && dataStream.getFailureIndices().isEmpty() == false) {
for (Index index : dataStream.getFailureIndices()) {
if (dataStream.isIndexManagedByDataStreamLifecycle(index, indexMetadataSupplier)
&& indicesToExcludeForRemainingRun.contains(index) == false) {
targetIndices.add(index);
Expand Down Expand Up @@ -820,7 +818,7 @@ private Set<Index> maybeExecuteRollover(ClusterState state, DataStream dataStrea

@Nullable
private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, boolean rolloverFailureStore) {
Index currentRunWriteIndex = rolloverFailureStore ? dataStream.getFailureStoreWriteIndex() : dataStream.getWriteIndex();
Index currentRunWriteIndex = rolloverFailureStore ? dataStream.getWriteFailureIndex() : dataStream.getWriteIndex();
if (currentRunWriteIndex == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void testUpdateTimeSeriesTemporalRange_NoUpdateBecauseReplicated() {
).getMetadata();
DataStream d = metadata.dataStreams().get(dataStreamName);
metadata = Metadata.builder(metadata)
.put(d.copy().setReplicated(true).setBackingIndices(d.getBackingIndices().copy().setRolloverOnWrite(false).build()).build())
.put(d.copy().setReplicated(true).setBackingIndices(d.getDataComponent().copy().setRolloverOnWrite(false).build()).build())
.build();

now = now.plus(1, ChronoUnit.HOURS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void testOperationsExecutedOnce() {
.toList();
assertThat(deleteRequests.get(0).indices()[0], is(dataStream.getIndices().get(0).getName()));
assertThat(deleteRequests.get(1).indices()[0], is(dataStream.getIndices().get(1).getName()));
assertThat(deleteRequests.get(2).indices()[0], is(dataStream.getFailureIndices().getIndices().get(0).getName()));
assertThat(deleteRequests.get(2).indices()[0], is(dataStream.getFailureIndices().get(0).getName()));

// on the second run the rollover and delete requests should not execute anymore
// i.e. the count should *remain* 1 for rollover and 2 for deletes
Expand Down Expand Up @@ -1495,7 +1495,7 @@ public void testTargetIndices() {
).copy().setDataStreamOptions(dataStreamOptions).build(); // failure store is managed even when disabled
builder.put(dataStream);
Metadata metadata = builder.build();
Set<Index> indicesToExclude = Set.of(dataStream.getIndices().get(0), dataStream.getFailureIndices().getIndices().get(0));
Set<Index> indicesToExclude = Set.of(dataStream.getIndices().get(0), dataStream.getFailureIndices().get(0));
List<Index> targetBackingIndicesOnly = DataStreamLifecycleService.getTargetIndices(
dataStream,
indicesToExclude,
Expand All @@ -1506,9 +1506,7 @@ public void testTargetIndices() {
List<Index> targetIndices = DataStreamLifecycleService.getTargetIndices(dataStream, indicesToExclude, metadata::index, true);
assertThat(
targetIndices,
equalTo(
List.of(dataStream.getIndices().get(1), dataStream.getIndices().get(2), dataStream.getFailureIndices().getIndices().get(1))
)
equalTo(List.of(dataStream.getIndices().get(1), dataStream.getIndices().get(2), dataStream.getFailureIndices().get(1)))
);
}

Expand Down Expand Up @@ -1540,10 +1538,7 @@ public void testFailureStoreIsManagedEvenWhenDisabled() {
rolloverFailureIndexRequest.getRolloverTarget(),
is(IndexNameExpressionResolver.combineSelector(dataStreamName, IndexComponentSelector.FAILURES))
);
assertThat(
((DeleteIndexRequest) clientSeenRequests.get(2)).indices()[0],
is(dataStream.getFailureIndices().getIndices().get(0).getName())
);
assertThat(((DeleteIndexRequest) clientSeenRequests.get(2)).indices()[0], is(dataStream.getFailureIndices().get(0).getName()));
}

public void testMaybeExecuteRetentionSuccessfulDownsampledIndex() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,9 @@ ClusterState execute(

final var dataStream = clusterState.metadata().dataStreams().get(request.index());
final var backingIndexName = dataStream.getIndices().get(0).getName();
final var indexNames = dataStream.getFailureIndices().getIndices().isEmpty()
final var indexNames = dataStream.getFailureIndices().isEmpty()
? List.of(backingIndexName)
: List.of(backingIndexName, dataStream.getFailureIndices().getIndices().get(0).getName());
: List.of(backingIndexName, dataStream.getFailureIndices().get(0).getName());
taskContext.success(getAckListener(indexNames, allocationActionMultiListener));
successfulRequests.put(request, indexNames);
return clusterState;
Expand Down
Loading

0 comments on commit 2ee7ca4

Please sign in to comment.