Skip to content

Commit

Permalink
Fix ILMDownsampleDisruptionIT.testILMDownsampleRollingRestart (elas…
Browse files Browse the repository at this point in the history
…tic#119196)

This removes a redundant thread creation when triggering a rolling
restart as the method is already async and drops the check for cluster
health as that might hit a node that's being shut down (the master node
in particular).
  • Loading branch information
nielsbauman authored Jan 9, 2025
1 parent be0d4d9 commit 2153cac
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 93 deletions.
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,6 @@ tests:
- class: org.elasticsearch.xpack.shutdown.NodeShutdownIT
method: testAllocationPreventedForRemoval
issue: https://github.com/elastic/elasticsearch/issues/116363
- class: org.elasticsearch.xpack.downsample.ILMDownsampleDisruptionIT
method: testILMDownsampleRollingRestart
issue: https://github.com/elastic/elasticsearch/issues/114233
- class: org.elasticsearch.reservedstate.service.RepositoriesFileSettingsIT
method: testSettingsApplied
issue: https://github.com/elastic/elasticsearch/issues/116694
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
Expand Down Expand Up @@ -57,9 +55,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
Expand Down Expand Up @@ -144,7 +140,7 @@ public void setup(final String sourceIndex, int numOfShards, int numOfReplicas,

public void testILMDownsampleRollingRestart() throws Exception {
final InternalTestCluster cluster = internalCluster();
final List<String> masterNodes = cluster.startMasterOnlyNodes(1);
cluster.startMasterOnlyNodes(1);
cluster.startDataOnlyNodes(3);
ensureStableCluster(cluster.size());
ensureGreen();
Expand All @@ -169,46 +165,16 @@ public void testILMDownsampleRollingRestart() throws Exception {
.endObject();
};
int indexedDocs = bulkIndex(sourceIndex, sourceSupplier, DOC_COUNT);
final CountDownLatch disruptionStart = new CountDownLatch(1);
final CountDownLatch disruptionEnd = new CountDownLatch(1);

new Thread(new Disruptor(cluster, sourceIndex, new DisruptionListener() {
@Override
public void disruptionStart() {
disruptionStart.countDown();
}

@Override
public void disruptionEnd() {
disruptionEnd.countDown();
}
}, masterNodes.get(0), (ignored) -> {
try {
cluster.rollingRestart(new InternalTestCluster.RestartCallback() {
@Override
public boolean validateClusterForming() {
return true;
}
});
} catch (Exception e) {
throw new RuntimeException(e);
}
})).start();
cluster.rollingRestart(new InternalTestCluster.RestartCallback());

final String targetIndex = "downsample-1h-" + sourceIndex;
startDownsampleTaskViaIlm(sourceIndex, targetIndex, disruptionStart, disruptionEnd);
waitUntil(() -> getClusterPendingTasks(cluster.client()).pendingTasks().isEmpty(), 60, TimeUnit.SECONDS);
ensureStableCluster(cluster.numDataAndMasterNodes());
assertTargetIndex(cluster, targetIndex, indexedDocs);
startDownsampleTaskViaIlm(sourceIndex, targetIndex);
assertBusy(() -> assertTargetIndex(cluster, targetIndex, indexedDocs));
ensureGreen(targetIndex);
}

private void startDownsampleTaskViaIlm(
String sourceIndex,
String targetIndex,
CountDownLatch disruptionStart,
CountDownLatch disruptionEnd
) throws Exception {
disruptionStart.await();
private void startDownsampleTaskViaIlm(String sourceIndex, String targetIndex) throws Exception {
var request = new UpdateSettingsRequest(sourceIndex).settings(
Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, POLICY_NAME)
);
Expand All @@ -231,7 +197,6 @@ private void startDownsampleTaskViaIlm(
var getSettingsResponse = client().admin().indices().getSettings(new GetSettingsRequest().indices(targetIndex)).actionGet();
assertThat(getSettingsResponse.getSetting(targetIndex, IndexMetadata.INDEX_DOWNSAMPLE_STATUS.getKey()), equalTo("success"));
}, 60, TimeUnit.SECONDS);
disruptionEnd.await();
}

private void assertTargetIndex(final InternalTestCluster cluster, final String targetIndex, int indexedDocs) {
Expand Down Expand Up @@ -294,53 +259,4 @@ private String randomDateForRange(long start, long end) {
public interface SourceSupplier {
XContentBuilder get() throws IOException;
}

interface DisruptionListener {
void disruptionStart();

void disruptionEnd();
}

private class Disruptor implements Runnable {
final InternalTestCluster cluster;
private final String sourceIndex;
private final DisruptionListener listener;
private final String clientNode;
private final Consumer<String> disruption;

private Disruptor(
final InternalTestCluster cluster,
final String sourceIndex,
final DisruptionListener listener,
final String clientNode,
final Consumer<String> disruption
) {
this.cluster = cluster;
this.sourceIndex = sourceIndex;
this.listener = listener;
this.clientNode = clientNode;
this.disruption = disruption;
}

@Override
public void run() {
listener.disruptionStart();
try {
final String candidateNode = safeExecute(
cluster.client(clientNode),
TransportClusterSearchShardsAction.TYPE,
new ClusterSearchShardsRequest(TEST_REQUEST_TIMEOUT, sourceIndex)
).getNodes()[0].getName();
logger.info("Candidate node [" + candidateNode + "]");
disruption.accept(candidateNode);
ensureGreen(sourceIndex);
ensureStableCluster(cluster.numDataAndMasterNodes(), clientNode);

} catch (Exception e) {
logger.error("Ignoring Error while injecting disruption [" + e.getMessage() + "]");
} finally {
listener.disruptionEnd();
}
}
}
}

0 comments on commit 2153cac

Please sign in to comment.