Skip to content

Commit

Permalink
Small cleanup in HealthMetadataService (elastic#98197)
Browse files Browse the repository at this point in the history
Tidies up the nested task/executor classes to remove some indirection,
and implements `describeTasks()` to avoid noise in debug logs.
  • Loading branch information
DaveCTurner authored Aug 4, 2023
1 parent a0cad5f commit fa5caec
Showing 1 changed file with 23 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,7 @@ private HealthMetadataService(ClusterService clusterService, Settings settings)
this.clusterStateListener = this::updateOnClusterStateChange;
this.enabled = ENABLED_SETTING.get(settings);
this.localHealthMetadata = initialHealthMetadata(settings);
this.taskQueue = clusterService.createTaskQueue(
"health metadata service",
Priority.NORMAL,
new UpsertHealthMetadataTask.Executor()
);
this.taskQueue = clusterService.createTaskQueue("health metadata service", Priority.NORMAL, new Executor());
}

public static HealthMetadataService create(ClusterService clusterService, Settings settings) {
Expand Down Expand Up @@ -128,7 +124,7 @@ private void updateOnHealthNodeEnabledChange(boolean enabled) {
clusterService.addListener(clusterStateListener);

if (canPostClusterStateUpdates(clusterService.state())) {
taskQueue.submitTask("health-node-enabled", () -> this.localHealthMetadata, null);
taskQueue.submitTask("health-node-enabled", new UpsertHealthMetadataTask(), null);
}
} else {
clusterService.removeListener(clusterStateListener);
Expand All @@ -151,8 +147,8 @@ private void updateOnClusterStateChange(ClusterChangedEvent event) {
this.isMaster = event.localNodeMaster();
}
if (canPostClusterStateUpdates(event.state())) {
if (this.localHealthMetadata.equals(HealthMetadata.getFromClusterState(event.state())) == false) {
taskQueue.submitTask("store-local-health-metadata", () -> this.localHealthMetadata, null);
if (localHealthMetadata.equals(HealthMetadata.getFromClusterState(event.state())) == false) {
taskQueue.submitTask("store-local-health-metadata", new UpsertHealthMetadataTask(), null);
}
}
}
Expand All @@ -167,36 +163,36 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
/**
* A base class for health metadata cluster state update tasks.
*/
interface UpsertHealthMetadataTask extends ClusterStateTaskListener {

private static class UpsertHealthMetadataTask implements ClusterStateTaskListener {
@Override
default void onFailure(@Nullable Exception e) {
public void onFailure(@Nullable Exception e) {
logger.log(
MasterService.isPublishFailureException(e) ? Level.DEBUG : Level.WARN,
() -> "failure during health metadata update",
e
);
}
}

default ClusterState execute(ClusterState currentState) {
var initialHealthMetadata = HealthMetadata.getFromClusterState(currentState);
var finalHealthMetadata = latestLocalMetadata();
return finalHealthMetadata.equals(initialHealthMetadata)
? currentState
: currentState.copyAndUpdate(b -> b.putCustom(HealthMetadata.TYPE, finalHealthMetadata));
private class Executor extends SimpleBatchedExecutor<UpsertHealthMetadataTask, Void> {
@Override
public Tuple<ClusterState, Void> executeTask(UpsertHealthMetadataTask task, ClusterState clusterState) {
final var initialHealthMetadata = HealthMetadata.getFromClusterState(clusterState);
final var finalHealthMetadata = localHealthMetadata; // single volatile read
return Tuple.tuple(
finalHealthMetadata.equals(initialHealthMetadata)
? clusterState
: clusterState.copyAndUpdate(b -> b.putCustom(HealthMetadata.TYPE, finalHealthMetadata)),
null
);
}

HealthMetadata latestLocalMetadata();

class Executor extends SimpleBatchedExecutor<UpsertHealthMetadataTask, Void> {

@Override
public Tuple<ClusterState, Void> executeTask(UpsertHealthMetadataTask task, ClusterState clusterState) {
return Tuple.tuple(task.execute(clusterState), null);
}
@Override
public void taskSucceeded(UpsertHealthMetadataTask task, Void unused) {}

@Override
public void taskSucceeded(UpsertHealthMetadataTask task, Void unused) {}
@Override
public String describeTasks(List<UpsertHealthMetadataTask> tasks) {
return ""; // tasks are equivalent and idempotent, no need to list them out
}
}

Expand Down

0 comments on commit fa5caec

Please sign in to comment.