Skip to content

Commit

Permalink
Add failure store generation field
Browse files Browse the repository at this point in the history
  • Loading branch information
nielsbauman committed Mar 18, 2024
1 parent d84b46c commit 04f1c54
Show file tree
Hide file tree
Showing 16 changed files with 120 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1791,7 +1791,8 @@ public ClusterState execute(ClusterState currentState) throws Exception {
original.getLifecycle(),
original.isFailureStore(),
original.getFailureIndices(),
null
null,
original.getFailureStoreGeneration()
);
brokenDataStreamHolder.set(broken);
return ClusterState.builder(currentState)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,8 @@ public void testGetAdditionalIndexSettingsDataStreamAlreadyCreatedTimeSettingsMi
ds.getLifecycle(),
ds.isFailureStore(),
ds.getFailureIndices(),
null
null,
ds.getFailureStoreGeneration()
)
);
Metadata metadata = mb.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ public void testUpdateTimeSeriesTemporalRange_NoUpdateBecauseReplicated() {
d.getLifecycle(),
d.isFailureStore(),
d.getFailureIndices(),
null
null,
d.getFailureStoreGeneration()
)
)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ public void testResponseIlmAndDataStreamLifecycleRepresentation() throws Excepti
new DataStreamLifecycle(),
true,
failureStores,
null
null,
failureStores.size() + 1
);

String ilmPolicyName = "rollover-30days";
Expand Down Expand Up @@ -200,7 +201,8 @@ public void testResponseIlmAndDataStreamLifecycleRepresentation() throws Excepti
new DataStreamLifecycle(null, null, false),
true,
failureStores,
null
null,
failureStores.size() + 1
);

String ilmPolicyName = "rollover-30days";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,8 @@ public void testRetentionNotExecutedForTSIndicesWithinTimeBounds() {
DataStreamLifecycle.newBuilder().dataRetention(0L).build(),
dataStream.isFailureStore(),
dataStream.getFailureIndices(),
null
null,
dataStream.getFailureStoreGeneration()
)
);
clusterState = ClusterState.builder(clusterState).metadata(builder).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ static TransportVersion def(int id) {
public static final TransportVersion AGGS_EXCLUDED_DELETED_DOCS = def(8_609_00_0);
public static final TransportVersion ESQL_SERIALIZE_BIG_ARRAY = def(8_610_00_0);
public static final TransportVersion AUTO_SHARDING_ROLLOVER_CONDITION = def(8_611_00_0);
public static final TransportVersion FAILURE_STORE_ROLLOVER = def(8_612_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public static boolean isFailureStoreEnabled() {
private final DataStreamLifecycle lifecycle;
private final boolean rolloverOnWrite;
private final boolean failureStore;
private final long failureStoreGeneration;
private final List<Index> failureIndices;
private volatile Set<String> failureStoreLookup;
@Nullable
Expand All @@ -131,7 +132,8 @@ public DataStream(
DataStreamLifecycle lifecycle,
boolean failureStore,
List<Index> failureIndices,
@Nullable DataStreamAutoShardingEvent autoShardingEvent
@Nullable DataStreamAutoShardingEvent autoShardingEvent,
long failureStoreGeneration
) {
this(
name,
Expand All @@ -148,7 +150,8 @@ public DataStream(
failureStore,
failureIndices,
false,
autoShardingEvent
autoShardingEvent,
failureStoreGeneration
);
}

Expand All @@ -166,7 +169,8 @@ public DataStream(
boolean failureStore,
List<Index> failureIndices,
boolean rolloverOnWrite,
@Nullable DataStreamAutoShardingEvent autoShardingEvent
@Nullable DataStreamAutoShardingEvent autoShardingEvent,
long failureStoreGeneration
) {
this(
name,
Expand All @@ -183,7 +187,8 @@ public DataStream(
failureStore,
failureIndices,
rolloverOnWrite,
autoShardingEvent
autoShardingEvent,
failureStoreGeneration
);
}

Expand All @@ -203,7 +208,8 @@ public DataStream(
boolean failureStore,
List<Index> failureIndices,
boolean rolloverOnWrite,
@Nullable DataStreamAutoShardingEvent autoShardingEvent
@Nullable DataStreamAutoShardingEvent autoShardingEvent,
long failureStoreGeneration
) {
this.name = name;
this.indices = List.copyOf(indices);
Expand All @@ -219,6 +225,7 @@ public DataStream(
this.indexMode = indexMode;
this.lifecycle = lifecycle;
this.failureStore = failureStore;
this.failureStoreGeneration = failureStoreGeneration;
this.failureIndices = failureIndices;
assert assertConsistent(this.indices);
this.rolloverOnWrite = rolloverOnWrite;
Expand All @@ -237,7 +244,22 @@ public DataStream(
boolean allowCustomRouting,
IndexMode indexMode
) {
this(name, indices, generation, metadata, hidden, replicated, system, allowCustomRouting, indexMode, null, false, List.of(), null);
this(
name,
indices,
generation,
metadata,
hidden,
replicated,
system,
allowCustomRouting,
indexMode,
null,
false,
List.of(),
null,
1
);
}

private static boolean assertConsistent(List<Index> indices) {
Expand Down Expand Up @@ -274,6 +296,10 @@ public long getGeneration() {
return generation;
}

public long getFailureStoreGeneration() {
return failureStoreGeneration;
}

public List<Index> getFailureIndices() {
return failureIndices;
}
Expand Down Expand Up @@ -506,7 +532,8 @@ public DataStream unsafeRollover(Index writeIndex, long generation, boolean time
lifecycle,
failureStore,
failureIndices,
autoShardingEvent
autoShardingEvent,
failureStoreGeneration
);
}

Expand Down Expand Up @@ -585,7 +612,8 @@ public DataStream removeBackingIndex(Index index) {
lifecycle,
failureStore,
failureIndices,
autoShardingEvent
autoShardingEvent,
failureStoreGeneration
);
}

Expand Down Expand Up @@ -631,7 +659,8 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki
lifecycle,
failureStore,
failureIndices,
autoShardingEvent
autoShardingEvent,
failureStoreGeneration
);
}

Expand Down Expand Up @@ -692,7 +721,8 @@ public DataStream addBackingIndex(Metadata clusterMetadata, Index index) {
lifecycle,
failureStore,
failureIndices,
autoShardingEvent
autoShardingEvent,
failureStoreGeneration
);
}

Expand All @@ -712,7 +742,8 @@ public DataStream promoteDataStream() {
failureStore,
failureIndices,
rolloverOnWrite,
autoShardingEvent
autoShardingEvent,
failureStoreGeneration
);
}

Expand Down Expand Up @@ -749,7 +780,8 @@ public DataStream snapshot(Collection<String> indicesInSnapshot) {
lifecycle,
failureStore,
failureIndices,
autoShardingEvent
autoShardingEvent,
failureStoreGeneration
);
}

Expand Down Expand Up @@ -971,7 +1003,8 @@ public DataStream(StreamInput in) throws IOException {
in.getTransportVersion().onOrAfter(TransportVersions.LAZY_ROLLOVER_ADDED) ? in.readBoolean() : false,
in.getTransportVersion().onOrAfter(DataStream.ADDED_AUTO_SHARDING_EVENT_VERSION)
? in.readOptionalWriteable(DataStreamAutoShardingEvent::new)
: null
: null,
in.getTransportVersion().onOrAfter(TransportVersions.FAILURE_STORE_ROLLOVER) ? in.readVLong() : 1
);
}

Expand Down Expand Up @@ -1018,6 +1051,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(DataStream.ADDED_AUTO_SHARDING_EVENT_VERSION)) {
out.writeOptionalWriteable(autoShardingEvent);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.FAILURE_STORE_ROLLOVER)) {
out.writeVLong(failureStoreGeneration);
}
}

public static final ParseField NAME_FIELD = new ParseField("name");
Expand All @@ -1032,6 +1068,7 @@ public void writeTo(StreamOutput out) throws IOException {
public static final ParseField INDEX_MODE = new ParseField("index_mode");
public static final ParseField LIFECYCLE = new ParseField("lifecycle");
public static final ParseField FAILURE_STORE_FIELD = new ParseField("failure_store");
public static final ParseField FAILURE_STORE_GENERATION_FIELD = new ParseField("failure_store_generation");
public static final ParseField FAILURE_INDICES_FIELD = new ParseField("failure_indices");
public static final ParseField ROLLOVER_ON_WRITE_FIELD = new ParseField("rollover_on_write");
public static final ParseField AUTO_SHARDING_FIELD = new ParseField("auto_sharding");
Expand All @@ -1042,6 +1079,7 @@ public void writeTo(StreamOutput out) throws IOException {
// Until the feature flag is removed we keep them separately to be mindful of this.
boolean failureStoreEnabled = DataStream.isFailureStoreEnabled() && args[12] != null && (boolean) args[12];
List<Index> failureStoreIndices = DataStream.isFailureStoreEnabled() && args[13] != null ? (List<Index>) args[13] : List.of();
long failureStoreGeneration = DataStream.isFailureStoreEnabled() && args[14] != null ? (long) args[14] : 1;
return new DataStream(
(String) args[0],
(List<Index>) args[1],
Expand All @@ -1056,7 +1094,8 @@ public void writeTo(StreamOutput out) throws IOException {
failureStoreEnabled,
failureStoreIndices,
args[10] != null && (boolean) args[10],
(DataStreamAutoShardingEvent) args[11]
(DataStreamAutoShardingEvent) args[11],
failureStoreGeneration
);
});

Expand Down Expand Up @@ -1093,6 +1132,7 @@ public void writeTo(StreamOutput out) throws IOException {
(p, c) -> Index.fromXContent(p),
FAILURE_INDICES_FIELD
);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), FAILURE_STORE_GENERATION_FIELD);
}
}

Expand Down Expand Up @@ -1124,6 +1164,7 @@ public XContentBuilder toXContent(
builder.field(GENERATION_FIELD.getPreferredName(), generation);
if (DataStream.isFailureStoreEnabled() && failureIndices.isEmpty() == false) {
builder.xContentList(FAILURE_INDICES_FIELD.getPreferredName(), failureIndices);
builder.field(FAILURE_STORE_GENERATION_FIELD.getPreferredName(), failureStoreGeneration);
}
if (metadata != null) {
builder.field(METADATA_FIELD.getPreferredName(), metadata);
Expand Down Expand Up @@ -1168,6 +1209,7 @@ public boolean equals(Object o) {
&& indexMode == that.indexMode
&& Objects.equals(lifecycle, that.lifecycle)
&& failureStore == that.failureStore
&& failureStoreGeneration == that.failureStoreGeneration
&& failureIndices.equals(that.failureIndices)
&& rolloverOnWrite == that.rolloverOnWrite
&& Objects.equals(autoShardingEvent, that.autoShardingEvent);
Expand All @@ -1187,6 +1229,7 @@ public int hashCode() {
indexMode,
lifecycle,
failureStore,
failureStoreGeneration,
failureIndices,
rolloverOnWrite,
autoShardingEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,8 @@ static ClusterState createDataStream(
lifecycle == null && isDslOnlyMode ? DataStreamLifecycle.DEFAULT : lifecycle,
template.getDataStreamTemplate().hasFailureStore(),
failureIndices,
null
null,
1
);
Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ static ClusterState updateDataLifecycle(
lifecycle,
dataStream.isFailureStore(),
dataStream.getFailureIndices(),
dataStream.getAutoShardingEvent()
dataStream.getAutoShardingEvent(),
dataStream.getFailureStoreGeneration()
)
);
}
Expand Down Expand Up @@ -251,7 +252,8 @@ public static ClusterState setRolloverOnWrite(ClusterState currentState, String
dataStream.isFailureStore(),
dataStream.getFailureIndices(),
rolloverOnWrite,
dataStream.getAutoShardingEvent()
dataStream.getAutoShardingEvent(),
dataStream.getFailureStoreGeneration()
)
);
return ClusterState.builder(currentState).metadata(builder.build()).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,8 @@ static DataStream updateDataStream(DataStream dataStream, Metadata.Builder metad
dataStream.getLifecycle(),
dataStream.isFailureStore(),
dataStream.getFailureIndices(),
dataStream.getAutoShardingEvent()
dataStream.getAutoShardingEvent(),
dataStream.getFailureStoreGeneration()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,8 @@ private DataStream createDataStream(
null,
false,
List.of(),
autoShardingEvent
autoShardingEvent,
1
);
}

Expand Down
Loading

0 comments on commit 04f1c54

Please sign in to comment.