Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread Context POC changes #5

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,17 @@ public class FollowersChecker {
// the time between checks sent to each node
public static final Setting<TimeValue> FOLLOWER_CHECK_INTERVAL_SETTING = Setting.timeSetting(
"cluster.fault_detection.follower_check.interval",
TimeValue.timeValueMillis(1000),
TimeValue.timeValueMillis(100000),
TimeValue.timeValueMillis(100),
Setting.Property.NodeScope
);

// the timeout for each check sent to each node
public static final Setting<TimeValue> FOLLOWER_CHECK_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.fault_detection.follower_check.timeout",
TimeValue.timeValueMillis(10000),
TimeValue.timeValueMillis(100000),
TimeValue.timeValueMillis(1),
TimeValue.timeValueMillis(60000),
TimeValue.timeValueMillis(6000000),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,17 @@ public class LeaderChecker {
// the time between checks sent to the leader
public static final Setting<TimeValue> LEADER_CHECK_INTERVAL_SETTING = Setting.timeSetting(
"cluster.fault_detection.leader_check.interval",
TimeValue.timeValueMillis(1000),
TimeValue.timeValueMillis(100000),
TimeValue.timeValueMillis(100),
Setting.Property.NodeScope
);

// the timeout for each check sent to the leader
public static final Setting<TimeValue> LEADER_CHECK_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.fault_detection.leader_check.timeout",
TimeValue.timeValueMillis(10000),
TimeValue.timeValueMillis(100000),
TimeValue.timeValueMillis(1),
TimeValue.timeValueMillis(60000),
TimeValue.timeValueMillis(600000),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.tasks.ResourceUsageStatsTCPropagator;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskThreadContextStatePropagator;

Expand Down Expand Up @@ -128,7 +129,7 @@ public ThreadContext(Settings settings) {
this.threadLocal = ThreadLocal.withInitial(() -> DEFAULT_CONTEXT);
this.maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT.get(settings);
this.maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes();
this.propagators = new CopyOnWriteArrayList<>(List.of(new TaskThreadContextStatePropagator()));
this.propagators = new CopyOnWriteArrayList<>(List.of(new TaskThreadContextStatePropagator(), new ResourceUsageStatsTCPropagator()));
}

public void registerThreadContextStatePropagator(final ThreadContextStatePropagator propagator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.common.annotation.PublicApi;

import java.util.HashMap;
import java.util.Map;

/**
Expand All @@ -26,7 +27,7 @@ public interface ThreadContextStatePropagator {
* @param source current context transient headers
* @return the list of transient headers that needs to be propagated from current context to new thread context
*/
@Deprecated(since = "2.12.0", forRemoval = true)
// @Deprecated(since = "2.12.0", forRemoval = true)
Map<String, Object> transients(Map<String, Object> source);

/**
Expand All @@ -46,7 +47,7 @@ default Map<String, Object> transients(Map<String, Object> source, boolean isSys
* @param source current context headers
* @return the list of request headers that needs to be propagated from current context to request
*/
@Deprecated(since = "2.12.0", forRemoval = true)
// @Deprecated(since = "2.12.0", forRemoval = true)
Map<String, String> headers(Map<String, Object> source);

/**
Expand Down
11 changes: 7 additions & 4 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,8 @@ protected Node(
);

AdmissionControlTransportInterceptor admissionControlTransportInterceptor = new AdmissionControlTransportInterceptor(
admissionControlService
admissionControlService,
threadPool
);

List<TransportInterceptor> transportInterceptors = List.of(admissionControlTransportInterceptor);
Expand Down Expand Up @@ -981,7 +982,8 @@ protected Node(
localNodeFactory,
settingsModule.getClusterSettings(),
taskHeaders,
tracer
tracer,
resourceUsageCollectorService
);
TopNSearchTasksLogger taskConsumer = new TopNSearchTasksLogger(settings, settingsModule.getClusterSettings());
transportService.getTaskManager().registerTaskResourceConsumer(taskConsumer);
Expand Down Expand Up @@ -1344,9 +1346,10 @@ protected TransportService newTransportService(
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
ClusterSettings clusterSettings,
Set<String> taskHeaders,
Tracer tracer
Tracer tracer,
ResourceUsageCollectorService resourceUsageCollectorService
) {
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, tracer);
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, tracer, resourceUsageCollectorService);
}

protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,12 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public String toString() {
StringBuilder sb = new StringBuilder("NodeResourceUsageStats[");
sb.append(nodeId).append("](");
sb.append("Timestamp: ").append(timestamp);
sb.append(", CPU utilization percent: ").append(String.format(Locale.ROOT, "%.1f", cpuUtilizationPercent));
sb.append(", Memory utilization percent: ").append(String.format(Locale.ROOT, "%.1f", memoryUtilizationPercent));
sb.append(")");
return sb.toString();
return nodeId + ":" +
timestamp +
"," +
memoryUtilizationPercent +
"," +
cpuUtilizationPercent;
}

NodeResourceUsageStats(NodeResourceUsageStats nodeResourceUsageStats) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ public Optional<NodeResourceUsageStats> getNodeStatistics(final String nodeId) {
.map(resourceUsageStats -> new NodeResourceUsageStats(resourceUsageStats));
}

public Optional<NodeResourceUsageStats> getLocalNodeStatistics() {
if(clusterService.state() != null) {
return Optional.ofNullable(nodeIdToResourceUsageStats.get(clusterService.state().nodes().getLocalNodeId()))
.map(resourceUsageStats -> new NodeResourceUsageStats(resourceUsageStats));
}
return Optional.empty();
}
/**
* Returns collected resource usage statistics of all nodes
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ratelimitting.admissioncontrol.transport;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.*;

import java.io.IOException;

public class AdmissionControlInterceptSender {

ThreadPool threadPool;
public AdmissionControlInterceptSender(ThreadPool threadPool) {
this.threadPool = threadPool;
}
private static final Logger logger = LogManager.getLogger(AdmissionControlInterceptSender.class);
public <T extends TransportResponse> void sendRequestDecorate(
TransportInterceptor.AsyncSender sender,
Transport.Connection connection,
String action,
TransportRequest request,
TransportRequestOptions options,
TransportResponseHandler<T> handler
) {
try (ThreadContext.StoredContext stashedContext = this.getThreadContext().stashContext()) {
if(isActionIndexingOrSearch(action)){
logger.info("AdmissionControlInterceptSender is Triggered Action: {}", action);
}
RestoringTransportResponseHandler restoringTransportResponseHandler = new RestoringTransportResponseHandler(handler, stashedContext, action);
sender.sendRequest(connection, action, request, options, restoringTransportResponseHandler);
}
}

private boolean isActionIndexingOrSearch(String action) {
return action.startsWith("indices:data/read/search") || action.startsWith("indices:data/write/bulk");
}

private ThreadContext getThreadContext() {
return threadPool.getThreadContext();
}

private static class RestoringTransportResponseHandler<T extends TransportResponse> implements TransportResponseHandler<T> {

private final ThreadContext.StoredContext contextToRestore;
private final TransportResponseHandler<T> innerHandler;

private final String action;

private RestoringTransportResponseHandler(TransportResponseHandler<T> innerHandler, ThreadContext.StoredContext contextToRestore, String action) {
this.contextToRestore = contextToRestore;
this.innerHandler = innerHandler;
this.action = action;
}

@Override
public T read(StreamInput in) throws IOException {
return innerHandler.read(in);
}

@Override
public void handleResponse(T response) {
if (this.isActionIndexingOrSearch(this.action)){
logger.info("Handle Response Triggered in: RestoringTransportResponseHandler");
}
contextToRestore.restore();
innerHandler.handleResponse(response);
}

@Override
public void handleException(TransportException e) {
contextToRestore.restore();
innerHandler.handleException(e);
}

@Override
public String executor() {
return innerHandler.executor();
}

private boolean isActionIndexingOrSearch(String action) {
return action.startsWith("indices:data/read/search") || action.startsWith("indices:data/write/bulk");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,27 @@

package org.opensearch.ratelimitting.admissioncontrol.transport;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.transport.TransportInterceptor;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestHandler;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.*;

/**
* This class allows throttling by intercepting requests on both the sender and the receiver side.
*/
public class AdmissionControlTransportInterceptor implements TransportInterceptor {

AdmissionControlService admissionControlService;
AdmissionControlInterceptSender admissionControlInterceptSender;

public AdmissionControlTransportInterceptor(AdmissionControlService admissionControlService) {
private static final Logger logger = LogManager.getLogger(AdmissionControlTransportInterceptor.class);

public AdmissionControlTransportInterceptor(AdmissionControlService admissionControlService, ThreadPool threadPool) {
this.admissionControlService = admissionControlService;
admissionControlInterceptSender = new AdmissionControlInterceptSender(threadPool);
}

/**
Expand All @@ -45,4 +51,21 @@ public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
admissionControlActionType
);
}

@Override
public AsyncSender interceptSender(AsyncSender sender) {
logger.info("AdmissionControl Intercept Sender Initialised");
return new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(
Transport.Connection connection,
String action,
TransportRequest request,
TransportRequestOptions options,
TransportResponseHandler<T> handler
) {
admissionControlInterceptSender.sendRequestDecorate(sender, connection, action, request, options, handler);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

import java.util.HashMap;
import java.util.Map;
import org.opensearch.common.util.concurrent.ThreadContextStatePropagator;

public class ResourceUsageStatsTCPropagator implements ThreadContextStatePropagator {
public static final String NODE_RESOURCE_STATS = "PERF_STATS";
@Override
public Map<String, Object> transients(Map<String, Object> source) {
final Map<String, Object> transients = new HashMap<>();
for(Map.Entry<String, Object> entry : source.entrySet()) {
if(entry.getKey().startsWith(NODE_RESOURCE_STATS)) {
// key starts with prefix
transients.put(entry.getKey(), entry.getValue());
}
}
return transients;
}

@Override
public Map<String, String> headers(Map<String, Object> source) {
final Map<String, String> headers = new HashMap<>();
for(Map.Entry<String, Object> entry : source.entrySet()) {
if(entry.getKey().startsWith(NODE_RESOURCE_STATS)) {
// key starts with prefix
headers.put(entry.getKey(), entry.getValue().toString());
}
}
return headers;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport;

public class ResourceUsageStatsReference {
private String resourceUsageStats;

public ResourceUsageStatsReference(String stats) {
this.resourceUsageStats = stats;
}

public String getResourceUsageStats() {
return resourceUsageStats;
}

public void setResourceUsageStats(String stats) {
this.resourceUsageStats = new String(stats);
}

@Override
public String toString() {
return this.resourceUsageStats;
}

}
Loading
Loading