Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add wlm resiliency orchestrator (query group service) #15925

Merged
merged 54 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
a720136
cancellation related
kiranprakash154 Aug 7, 2024
83e20c0
Update CHANGELOG.md
kiranprakash154 Aug 7, 2024
9983c73
add better cancellation reason
kiranprakash154 Aug 7, 2024
245ee5d
Update DefaultTaskCancellationTests.java
kiranprakash154 Aug 7, 2024
0771fd2
refactor
kiranprakash154 Aug 21, 2024
4b1ef81
refactor
kiranprakash154 Aug 26, 2024
3ea44d7
Update DefaultTaskCancellation.java
kiranprakash154 Aug 27, 2024
0103089
Update DefaultTaskCancellation.java
kiranprakash154 Aug 29, 2024
092d715
Update DefaultTaskCancellation.java
kiranprakash154 Aug 29, 2024
4a2c51e
Update DefaultTaskSelectionStrategy.java
kiranprakash154 Aug 29, 2024
cbb51bd
refactor
kiranprakash154 Aug 29, 2024
4e846e2
refactor node level threshold
kiranprakash154 Aug 29, 2024
241b036
Merge branch 'main' into kp/wlm-cancellation-1
kaushalmahi12 Aug 30, 2024
7511d99
use query group task
kaushalmahi12 Aug 30, 2024
498743a
code clean up and refactorings
kaushalmahi12 Sep 3, 2024
e26e525
add unit tests and fix existing ones
kaushalmahi12 Sep 4, 2024
0ff2b09
uncomment the test case
kaushalmahi12 Sep 4, 2024
ddb8dce
update CHANGELOG
kaushalmahi12 Sep 4, 2024
3528054
Merge branch 'main' into feature/wlm-cancellation
kaushalmahi12 Sep 4, 2024
e8366a5
fix imports
kaushalmahi12 Sep 4, 2024
d2d02e3
add queryGroupService
kaushalmahi12 Sep 4, 2024
448ea41
refactor and add UTs for new constructs
kaushalmahi12 Sep 5, 2024
3fc21be
fix javadocs
kaushalmahi12 Sep 5, 2024
fe02a6a
remove code clutter
kaushalmahi12 Sep 6, 2024
8aede33
change annotation version and task selection strategy
kaushalmahi12 Sep 6, 2024
623f6f8
rename a util class
kaushalmahi12 Sep 6, 2024
9e2e3ea
remove wrappers from resource type
kaushalmahi12 Sep 6, 2024
34184ef
apply spotless
kaushalmahi12 Sep 6, 2024
91893e7
address comments
kaushalmahi12 Sep 9, 2024
66e43b2
add rename changes
kaushalmahi12 Sep 9, 2024
a6b1afd
Merge branch 'main' into feature/wlm-cancellation
kaushalmahi12 Sep 9, 2024
981b15f
address comments
kaushalmahi12 Sep 9, 2024
16b6dff
Merge branch 'feature/wlm-cancellation' into feature/wlm-service
kaushalmahi12 Sep 10, 2024
0be1023
initial changes
kaushalmahi12 Sep 10, 2024
caf5914
refactor changes and logical bug fix
kaushalmahi12 Sep 10, 2024
6381e17
add chanegs
kaushalmahi12 Sep 11, 2024
b78ca02
address comments
kaushalmahi12 Sep 11, 2024
66e18b9
Merge branch 'feature/wlm-cancellation' into feature/wlm-service
kaushalmahi12 Sep 11, 2024
bec1ece
temp changes
kaushalmahi12 Sep 11, 2024
1e76ede
add UTs
kaushalmahi12 Sep 13, 2024
7c5a48e
Merge branch 'main' into feature/wlm-service
kaushalmahi12 Sep 13, 2024
931f6bf
add changelog
kaushalmahi12 Sep 13, 2024
3f4d590
add task completion listener hook
kaushalmahi12 Sep 13, 2024
501e5e9
add remaining pieces to make the feature functional
kaushalmahi12 Sep 13, 2024
2292fcd
extend stats and fix bugs
kaushalmahi12 Sep 16, 2024
139404a
fix bugs and add logic to make SBP work with wlm
kaushalmahi12 Sep 19, 2024
56c3393
address comments
kaushalmahi12 Oct 1, 2024
baa39c5
fix bugs and SBP ITs
kaushalmahi12 Oct 1, 2024
3c6adae
add missed applyCluster state change
kaushalmahi12 Oct 1, 2024
040ed4c
Merge branch 'main' into feature/wlm-service
kaushalmahi12 Oct 7, 2024
862af7b
address comments
kaushalmahi12 Oct 7, 2024
e2b203c
decouple queryGroupService and cancellationService
kaushalmahi12 Oct 7, 2024
623078a
replace StateApplier with StateListener interface
kaushalmahi12 Oct 7, 2024
2d7316b
fix precommit errors
kaushalmahi12 Oct 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637))
- [Workload Management] QueryGroup resource cancellation framework changes ([#15651](https://github.com/opensearch-project/OpenSearch/pull/15651))
- [Workload Management] Add orchestrator for wlm resiliency (QueryGroupService) ([#15925](https://github.com/opensearch-project/OpenSearch/pull/15925))
- Fallback to Remote cluster-state on Term-Version check mismatch - ([#15424](https://github.com/opensearch-project/OpenSearch/pull/15424))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.usage.UsageService;
import org.opensearch.wlm.QueryGroupTask;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -559,7 +560,10 @@ public ActionModule(
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<RestHeaderDefinition> headers = Stream.concat(
actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
Stream.of(new RestHeaderDefinition(Task.X_OPAQUE_ID, false))
Stream.of(
new RestHeaderDefinition(Task.X_OPAQUE_ID, false),
new RestHeaderDefinition(QueryGroupTask.QUERY_GROUP_ID_HEADER, false)
)
).collect(Collectors.toSet());
UnaryOperator<RestHandler> restWrapper = null;
for (ActionPlugin plugin : actionPlugins) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,10 @@ public void apply(Settings value, Settings current, Settings previous) {
WorkloadManagementSettings.NODE_LEVEL_CPU_REJECTION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_CPU_CANCELLATION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_MEMORY_REJECTION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD
WorkloadManagementSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD,
WorkloadManagementSettings.WLM_MODE_SETTING,
WorkloadManagementSettings.QUERYGROUP_SERVICE_RUN_INTERVAL_SETTING,
WorkloadManagementSettings.QUERYGROUP_SERVICE_DURESS_STREAK_SETTING
)
)
);
Expand Down
30 changes: 27 additions & 3 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,12 @@
import org.opensearch.usage.UsageService;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.wlm.QueryGroupService;
import org.opensearch.wlm.WorkloadManagementSettings;
import org.opensearch.wlm.WorkloadManagementTransportInterceptor;
import org.opensearch.wlm.cancellation.MaximumResourceTaskSelectionStrategy;
import org.opensearch.wlm.cancellation.QueryGroupTaskCancellationService;
import org.opensearch.wlm.listeners.QueryGroupRequestOperationListener;
import org.opensearch.wlm.tracker.QueryGroupResourceUsageTrackerService;

import javax.net.ssl.SNIHostName;

Expand Down Expand Up @@ -1026,8 +1030,26 @@ protected Node(
List<IdentityAwarePlugin> identityAwarePlugins = pluginsService.filterPlugins(IdentityAwarePlugin.class);
identityService.initializeIdentityAwarePlugins(identityAwarePlugins);

final QueryGroupService queryGroupService = new QueryGroupService(); // We will need to replace this with actual instance of the
// queryGroupService
QueryGroupResourceUsageTrackerService queryGroupResourceUsageTrackerService = new QueryGroupResourceUsageTrackerService(
taskResourceTrackingService
);
WorkloadManagementSettings workloadManagementSettings = new WorkloadManagementSettings(
settings,
settingsModule.getClusterSettings()
);
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved

final QueryGroupService queryGroupService = new QueryGroupService(
new QueryGroupTaskCancellationService(
workloadManagementSettings,
new MaximumResourceTaskSelectionStrategy(),
queryGroupResourceUsageTrackerService
),
clusterService,
threadPool,
workloadManagementSettings
);
taskResourceTrackingService.addTaskCompletionListener(queryGroupService);

final QueryGroupRequestOperationListener queryGroupRequestOperationListener = new QueryGroupRequestOperationListener(
queryGroupService,
threadPool
Expand Down Expand Up @@ -1093,7 +1115,7 @@ protected Node(

WorkloadManagementTransportInterceptor workloadManagementTransportInterceptor = new WorkloadManagementTransportInterceptor(
threadPool,
new QueryGroupService() // We will need to replace this with actual implementation
queryGroupService
);

final Collection<SecureSettingsFactory> secureSettingsFactories = pluginsService.filterPlugins(Plugin.class)
Expand Down Expand Up @@ -1400,6 +1422,7 @@ protected Node(
b.bind(IndexingPressureService.class).toInstance(indexingPressureService);
b.bind(TaskResourceTrackingService.class).toInstance(taskResourceTrackingService);
b.bind(SearchBackpressureService.class).toInstance(searchBackpressureService);
b.bind(QueryGroupService.class).toInstance(queryGroupService);
b.bind(AdmissionControlService.class).toInstance(admissionControlService);
b.bind(UsageService.class).toInstance(usageService);
b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
Expand Down Expand Up @@ -1593,6 +1616,7 @@ public Node start() throws NodeValidationException {
nodeService.getMonitorService().start();
nodeService.getSearchBackpressureService().start();
nodeService.getTaskCancellationMonitoringService().start();
injector.getInstance(QueryGroupService.class).start();

final ClusterService clusterService = injector.getInstance(ClusterService.class);

Expand Down
241 changes: 226 additions & 15 deletions server/src/main/java/org/opensearch/wlm/QueryGroupService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,193 @@

package org.opensearch.wlm;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateApplier;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.monitor.jvm.JvmStats;
import org.opensearch.monitor.process.ProcessProbe;
import org.opensearch.search.backpressure.trackers.NodeDuressTrackers;
import org.opensearch.search.backpressure.trackers.NodeDuressTrackers.NodeDuressTracker;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.wlm.cancellation.QueryGroupTaskCancellationService;
import org.opensearch.wlm.stats.QueryGroupState;
import org.opensearch.wlm.stats.QueryGroupStats;
import org.opensearch.wlm.stats.QueryGroupStats.QueryGroupStatsHolder;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static org.opensearch.wlm.tracker.QueryGroupResourceUsageTrackerService.TRACKED_RESOURCES;

/**
* As of now this is a stub and main implementation PR will be raised soon.Coming PR will collate these changes with core QueryGroupService changes
*/
public class QueryGroupService {
public class QueryGroupService extends AbstractLifecycleComponent
implements
ClusterStateApplier,
TaskResourceTrackingService.TaskCompletionListener {
// This map does not need to be concurrent since we will process the cluster state change serially and update
// this map with new additions and deletions of entries. QueryGroupState is thread safe
private final Map<String, QueryGroupState> queryGroupStateMap;
private static final Logger logger = LogManager.getLogger(QueryGroupService.class);

private final QueryGroupTaskCancellationService taskCancellationService;
private volatile Scheduler.Cancellable scheduledFuture;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final WorkloadManagementSettings workloadManagementSettings;
private final Set<QueryGroup> activeQueryGroups;
private final Set<QueryGroup> deletedQueryGroups;
private final NodeDuressTrackers nodeDuressTrackers;

public QueryGroupService(
QueryGroupTaskCancellationService taskCancellationService,
ClusterService clusterService,
ThreadPool threadPool,
WorkloadManagementSettings workloadManagementSettings
) {

this(
taskCancellationService,
clusterService,
threadPool,
workloadManagementSettings,
new NodeDuressTrackers(
Map.of(
ResourceType.CPU,
new NodeDuressTracker(
() -> workloadManagementSettings.getNodeLevelCpuCancellationThreshold() < ProcessProbe.getInstance()
.getProcessCpuPercent() / 100.0,
workloadManagementSettings::getDuressStreak
),
ResourceType.MEMORY,
new NodeDuressTracker(
() -> workloadManagementSettings.getNodeLevelMemoryCancellationThreshold() <= JvmStats.jvmStats()
.getMem()
.getHeapUsedPercent() / 100.0,
workloadManagementSettings::getDuressStreak
)
)
),
new HashMap<>(),
new HashSet<>(),
new HashSet<>()
);
}

public QueryGroupService(
QueryGroupTaskCancellationService taskCancellationService,
ClusterService clusterService,
ThreadPool threadPool,
WorkloadManagementSettings workloadManagementSettings,
NodeDuressTrackers nodeDuressTrackers,
Map<String, QueryGroupState> stateMap,
Set<QueryGroup> activeQueryGroups,
Set<QueryGroup> deletedQueryGroups
) {
this.taskCancellationService = taskCancellationService;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.workloadManagementSettings = workloadManagementSettings;
this.nodeDuressTrackers = nodeDuressTrackers;
this.activeQueryGroups = activeQueryGroups;
this.deletedQueryGroups = deletedQueryGroups;
activeQueryGroups.forEach(queryGroup -> stateMap.putIfAbsent(queryGroup.get_id(), new QueryGroupState()));
this.queryGroupStateMap = stateMap;
this.queryGroupStateMap.put(QueryGroupTask.DEFAULT_QUERY_GROUP_ID_SUPPLIER.get(), new QueryGroupState());
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
taskCancellationService.setQueryGroupStateMapAccessor(this::getQueryGroupState);
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
clusterService.addStateApplier(this);
}

public QueryGroupService() {
this(new HashMap<>());
/**
* run at regular interval
*/
protected void doRun() {
if (workloadManagementSettings.getWlmMode() == WlmMode.DISABLED) {
return;
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
}
taskCancellationService.refreshQueryGroups(activeQueryGroups, deletedQueryGroups);
taskCancellationService.cancelTasks(nodeDuressTrackers::isNodeInDuress);
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
taskCancellationService.pruneDeletedQueryGroups();
}

public QueryGroupService(Map<String, QueryGroupState> queryGroupStateMap) {
this.queryGroupStateMap = queryGroupStateMap;
private QueryGroupState getQueryGroupState(final String queryGroupId) {
return queryGroupStateMap.get(queryGroupId);
}

/**
* {@link AbstractLifecycleComponent} lifecycle method
*/
@Override
protected void doStart() {
scheduledFuture = threadPool.scheduleWithFixedDelay(() -> {
try {
doRun();
jainankitk marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception e) {
logger.debug("Exception occurred in Query Sandbox service", e);
}
}, this.workloadManagementSettings.getQueryGroupServiceRunInterval(), ThreadPool.Names.GENERIC);
}

@Override
protected void doStop() {
if (scheduledFuture != null) {
scheduledFuture.cancel();
}
}

@Override
protected void doClose() throws IOException {}

Check warning on line 160 in server/src/main/java/org/opensearch/wlm/QueryGroupService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/wlm/QueryGroupService.java#L160

Added line #L160 was not covered by tests

@Override
public void applyClusterState(ClusterChangedEvent event) {
// Retrieve the current and previous cluster states
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
Metadata previousMetadata = event.previousState().metadata();
Metadata currentMetadata = event.state().metadata();

// Extract the query groups from both the current and previous cluster states
Map<String, QueryGroup> previousQueryGroups = previousMetadata.queryGroups();
Map<String, QueryGroup> currentQueryGroups = currentMetadata.queryGroups();
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved

// Detect new query groups added in the current cluster state
for (String queryGroupName : currentQueryGroups.keySet()) {
if (!previousQueryGroups.containsKey(queryGroupName)) {
// New query group detected
QueryGroup newQueryGroup = currentQueryGroups.get(queryGroupName);
// Perform any necessary actions with the new query group
this.activeQueryGroups.add(newQueryGroup);
queryGroupStateMap.put(newQueryGroup.get_id(), new QueryGroupState());
}
}

// Detect query groups deleted in the current cluster state
for (String queryGroupName : previousQueryGroups.keySet()) {
if (!currentQueryGroups.containsKey(queryGroupName)) {
// Query group deleted
QueryGroup deletedQueryGroup = previousQueryGroups.get(queryGroupName);
// Perform any necessary actions with the deleted query group
this.deletedQueryGroups.add(deletedQueryGroup);
queryGroupStateMap.remove(deletedQueryGroup.get_id());
}
}
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* updates the failure stats for the query group
*
* @param queryGroupId query group identifier
*/
public void incrementFailuresFor(final String queryGroupId) {
Expand All @@ -47,7 +208,6 @@
}

/**
*
* @return node level query group stats
*/
public QueryGroupStats nodeStats() {
Expand All @@ -63,18 +223,69 @@
}

/**
*
* @param queryGroupId query group identifier
*/
public void rejectIfNeeded(String queryGroupId) {
if (queryGroupId == null) return;
boolean reject = false;
final StringBuilder reason = new StringBuilder();
// TODO: At this point this is dummy and we need to decide whether to cancel the request based on last
// reported resource usage for the queryGroup. We also need to increment the rejection count here for the
// query group
if (reject) {
throw new OpenSearchRejectedExecutionException("QueryGroup " + queryGroupId + " is already contended." + reason.toString());
if (queryGroupId == null || queryGroupId.equals(QueryGroupTask.DEFAULT_QUERY_GROUP_ID_SUPPLIER.get())) return;
QueryGroupState queryGroupState = queryGroupStateMap.get(queryGroupId);

// This can happen if the request failed for a deleted query group
// or new queryGroup is being created and has not been acknowledged yet
if (queryGroupState == null) {
return;

Check warning on line 235 in server/src/main/java/org/opensearch/wlm/QueryGroupService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/wlm/QueryGroupService.java#L235

Added line #L235 was not covered by tests
}

// rejections will not happen for SOFT mode QueryGroups
Optional<QueryGroup> optionalQueryGroup = activeQueryGroups.stream().filter(x -> x.get_id().equals(queryGroupId)).findFirst();

if (optionalQueryGroup.isPresent() && optionalQueryGroup.get().getResiliencyMode() == MutableQueryGroupFragment.ResiliencyMode.SOFT)
return;

optionalQueryGroup.ifPresent(queryGroup -> {
boolean reject = false;
final StringBuilder reason = new StringBuilder();
for (ResourceType resourceType : TRACKED_RESOURCES) {
if (queryGroup.getResourceLimits().containsKey(resourceType)) {
final double threshold = queryGroup.getResourceLimits().get(resourceType);
final double lastRecordedUsage = queryGroupState.getResourceState().get(resourceType).getLastRecordedUsage();
if (threshold < lastRecordedUsage) {
reject = true;
reason.append(resourceType)
.append(" limit is breaching for ENFORCED type QueryGroup: (")
.append(threshold)
.append(" < ")
.append(lastRecordedUsage)
.append("). ");
queryGroupState.getResourceState().get(resourceType).rejections.inc();
// should not double count even if both the resource limits are breaching
break;
}
}
}
if (reject) {
queryGroupState.totalRejections.inc();
throw new OpenSearchRejectedExecutionException(
"QueryGroup " + queryGroupId + " is already contended. " + reason.toString()
);
}
});
}

public Set<QueryGroup> getActiveQueryGroups() {
return activeQueryGroups;
}

public Set<QueryGroup> getDeletedQueryGroups() {
return deletedQueryGroups;
}

@Override
public void onTaskCompleted(Task task) {
if (!(task instanceof QueryGroupTask)) {
return;
}
final QueryGroupTask queryGroupTask = (QueryGroupTask) task;
final String queryGroupId = queryGroupTask.getQueryGroupId();
queryGroupStateMap.get(queryGroupId).completions.inc();
}
}
Loading
Loading