Skip to content

Commit

Permalink
Resource usage propagator changes
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Dec 25, 2023
1 parent 327b1dc commit a891884
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.tasks.ResourceUsageStatsTCPropagator;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskThreadContextStatePropagator;

Expand Down Expand Up @@ -126,7 +127,8 @@ public ThreadContext(Settings settings) {
this.threadLocal = ThreadLocal.withInitial(() -> DEFAULT_CONTEXT);
this.maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT.get(settings);
this.maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes();
this.propagators = new CopyOnWriteArrayList<>(List.of(new TaskThreadContextStatePropagator()));
this.propagators = new CopyOnWriteArrayList<>(List.of(new TaskThreadContextStatePropagator(),
new ResourceUsageStatsTCPropagator()));
}

public void registerThreadContextStatePropagator(final ThreadContextStatePropagator propagator) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.opensearch.common.util.concurrent.ThreadContextStatePropagator;

import static org.opensearch.tasks.TaskResourceTrackingService.TASK_ID;


public class ResourceUsageStatsTCPropagator implements ThreadContextStatePropagator {
public static final String NODE_RESOURCE_STATS = "PERF_STATS";
@Override
public Map<String, Object> transients(Map<String, Object> source) {
final Map<String, Object> transients = new HashMap<>();
for(Map.Entry<String, Object> entry : source.entrySet()) {
if(entry.getKey().startsWith(NODE_RESOURCE_STATS)) {
// key starts with prefix
transients.put(entry.getKey(), entry.getValue());
}
}
return transients;
}

@Override
public Map<String, String> headers(Map<String, Object> source) {
final Map<String, String> transients = new HashMap<>();
for(Map.Entry<String, Object> entry : source.entrySet()) {
if(entry.getKey().startsWith(NODE_RESOURCE_STATS)) {
// key starts with prefix
transients.put(entry.getKey(), entry.getValue().toString());
}
}
return transients;
//return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
package org.opensearch.transport;public class ResourceUsageStatsReference {
}
182 changes: 117 additions & 65 deletions server/src/main/java/org/opensearch/transport/TransportService.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,53 @@ public TransportService(
);
}

public TransportService(
Settings settings,
Transport transport,
ThreadPool threadPool,
TransportInterceptor transportInterceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
@Nullable ClusterSettings clusterSettings,
Set<String> taskHeaders,
Tracer tracer) {
this(
settings,
transport,
threadPool,
transportInterceptor,
localNodeFactory,
clusterSettings,
taskHeaders,
new ClusterConnectionManager(settings, transport),
tracer,
null
);

}
public TransportService(
Settings settings,
Transport transport,
ThreadPool threadPool,
TransportInterceptor transportInterceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
@Nullable ClusterSettings clusterSettings,
Set<String> taskHeaders,
ConnectionManager connectionManager,
Tracer tracer) {
this(
settings,
transport,
threadPool,
transportInterceptor,
localNodeFactory,
clusterSettings,
taskHeaders,
new ClusterConnectionManager(settings, transport),
tracer,
null
);
}

public TransportService(
Settings settings,
Transport transport,
Expand Down Expand Up @@ -932,62 +979,30 @@ public String toString() {
}

private void addResourceUsageStatsToThreadContext(String action) {
//if(action.startsWith("indices:data/read/search")) {
if (resourceUsageCollectorService.getLocalNodeStatistics().isPresent()) {
threadPool.getThreadContext().addResponseHeader("PERF_STATS",
resourceUsageCollectorService.getLocalNodeStatistics().get().toString());
}
//}
}

public final <T extends TransportResponse> void sendRequest(
final Transport.Connection connection,
final String action,
final TransportRequest request,
final TransportRequestOptions options,
final TransportResponseHandler<T> handler,
final boolean shouldAddResourceUsageStats
) {
final TransportResponseHandler<T> delegate;
if(shouldAddResourceUsageStats) {
delegate = new TransportResponseHandler<T>() {
@Override
public void handleResponse(T response) {
if(resourceUsageCollectorService.getLocalNodeStatistics().isPresent()) {
threadPool.getThreadContext().addResponseHeader("PERF_STATS",
resourceUsageCollectorService.getLocalNodeStatistics().get().toString());
}
handler.handleResponse(response);
}

@Override
public void handleException(TransportException exp) {
if(resourceUsageCollectorService.getLocalNodeStatistics().isPresent()) {
threadPool.getThreadContext().addResponseHeader("PERF_STATS",
resourceUsageCollectorService.getLocalNodeStatistics().get().toString());
}
handler.handleException(exp);
}

@Override
public String executor() {
return handler.executor();
}

@Override
public T read(StreamInput in) throws IOException {
return handler.read(in);
}

@Override
public String toString() {
return getClass().getName() + "/[" + action + "]:" + handler.toString();
if(action.startsWith("indices:data/read/search[phase/fetch")) {
System.out.println("===ACTION=== " + action);
}
if (resourceUsageCollectorService.getLocalNodeStatistics().isPresent()) {
logger.info("adding stats");
try {
//threadPool.getThreadContext().addResponseHeader("BLAH_STATS-action" , localNode.getId() +action);
ResourceUsageStatsReference statsReference = threadPool.getThreadContext()
.getTransient("PERF_STATS" + localNode.getId());
if(statsReference != null) {
System.out.println("Old stats reference : " + statsReference.toString());
statsReference.setResourceUsageStats(resourceUsageCollectorService.getLocalNodeStatistics().get().toString());
System.out.println("resource stats : " + resourceUsageCollectorService.getLocalNodeStatistics().get().toString());
System.out.println("New stats reference : " + statsReference.getResourceUsageStats());
} else {
threadPool.getThreadContext().putTransient("PERF_STATS" + localNode.getId(),
new ResourceUsageStatsReference(resourceUsageCollectorService.getLocalNodeStatistics().get().toString()));
}
};
} else {
delegate = handler;
} catch (Exception e) {
logger.info("===EXCEPTION=== {} ===action=== {}", e.getMessage(), action);
}
threadPool.getThreadContext().addResponseHeader("PERF_STATS" + localNode.getId(),
resourceUsageCollectorService.getLocalNodeStatistics().get().toString());
}
sendRequest(connection, action, request, options, delegate);
}

/**
Expand Down Expand Up @@ -1319,32 +1334,35 @@ public void onRequestReceived(long requestId, String action) {
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
tracerLog.trace("[{}][{}] received request", requestId, action);
}
addStatsToResourceUsageCollectorService();
addStatsToResourceUsageCollectorServiceFromRequestHeaders();
messageListener.onRequestReceived(requestId, action);
}

private void addStatsToResourceUsageCollectorService() {
try {
Map<String, List<String>> responseHeaders = threadPool.getThreadContext().getResponseHeaders();

if (responseHeaders.size() > 0) {
List<String> perfStats = responseHeaders.get("PERF_STATS");
if(perfStats.size() == 0) return;
// nodeid:111113131313,11.0,11.0
// NodeResourceUsageStats[aaxnzZb7R3KdRqjqXfv8SQ](Timestamp: 1699253278365, CPU utilization percent: 3.1, Memory utilization percent: 25.0)

StringBuilder sb = new StringBuilder();
String nodeId = perfStats.get(0).substring(0, perfStats.get(0).indexOf(':') + 1);
String nodeId = perfStats.get(0).substring(0, perfStats.get(0).indexOf(':'));
if(nodeId.length() == 0)
if (resourceUsageCollectorService.getNodeStatistics(nodeId).isPresent()) {
long timestamp = resourceUsageCollectorService.getNodeStatistics(nodeId).get().getTimestamp();
if (System.currentTimeMillis() - timestamp < 1000) {
logger.warn("Node resource usage stats is updated recently - so skipping");
logger.info("Node resource usage stats is updated recently - so skipping");
} else {
String[] parse = perfStats.get(0).split(":");
String[] parse1 = parse[1].split(",");
String datatimestamp = parse1[0];
String cpu = parse1[1];
String memory = parse1[2];
resourceUsageCollectorService.collectNodeResourceUsageStats(nodeId, Long.valueOf(datatimestamp), Double.valueOf(cpu), Double.valueOf(memory));
//logger.warn("Updates stats");
logger.info("Updates stats");
}
} else {
String[] parse = perfStats.get(0).split(":");
Expand All @@ -1353,15 +1371,49 @@ private void addStatsToResourceUsageCollectorService() {
String cpu = parse1[1];
String memory = parse1[2];
resourceUsageCollectorService.collectNodeResourceUsageStats(nodeId, Long.valueOf(datatimestamp), Double.valueOf(cpu), Double.valueOf(memory));
logger.warn("added stats");
logger.info("added stats");
}
// String[] parse = responseHeaders.get("PERF_STATS").get(0).split(":")[1].split("NodeResourceUsageStats\\[")[1].split("]");
// String nodeId = parse[0];
// String[] parse1 = parse[1].split("\\(")[1].split(": ");
// String timestamp = parse1[1].split(",")[0];
// String cpu = parse1[2].split(",")[0];
// String memory = parse1[3].split("\\)")[0];
}
} catch(Exception e){
logger.warn("Adding stats failed : ", e);
}
}

private void addStatsToResourceUsageCollectorServiceFromRequestHeaders() {
try {

for(Map.Entry<String,String> entry : threadPool.getThreadContext().getHeaders().entrySet()) {
if(entry.getKey().contains("PERF_STATS")) {
String perfStats = entry.getValue();
assert(threadPool.getThreadContext().getResponseHeaders().get(entry.getKey()).contains(entry.getValue()));
StringBuilder sb = new StringBuilder();
String nodeId = perfStats.substring(0, perfStats.indexOf(':'));
if (resourceUsageCollectorService.getNodeStatistics(nodeId).isPresent()) {
long timestamp = resourceUsageCollectorService.getNodeStatistics(nodeId).get().getTimestamp();
if (System.currentTimeMillis() - timestamp < 1000) {
logger.info("Node resource usage stats is updated recently - so skipping");
} else {
String[] parse = perfStats.split(":");
String[] parse1 = parse[1].split(",");
String datatimestamp = parse1[0];
String cpu = parse1[1];
String memory = parse1[2];
resourceUsageCollectorService.collectNodeResourceUsageStats(nodeId,
Long.valueOf(datatimestamp), Double.valueOf(cpu), Double.valueOf(memory));
logger.info("Updates stats");
}
} else {
String[] parse = perfStats.split(":");
String[] parse1 = parse[1].split(",");
String datatimestamp = parse1[0];
String cpu = parse1[1];
String memory = parse1[2];
resourceUsageCollectorService.collectNodeResourceUsageStats(nodeId,
Long.valueOf(datatimestamp),
Double.valueOf(cpu), Double.valueOf(memory));
logger.info("added stats");
}
}
}
} catch(Exception e){
logger.warn("Adding stats failed : ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,18 @@ public void testStashContext() {
assertEquals("bar", threadContext.getHeader("foo"));
assertEquals(Integer.valueOf(1), threadContext.getTransient("ctx.foo"));
assertEquals("1", threadContext.getHeader("default"));
threadContext.addResponseHeader("resp", "val");
threadContext.putTransient("PERF_STATS_NODE1", "abc");
threadContext.addResponseHeader("resp", "val1");
threadContext.putHeader("PERF", "1");
assertEquals("val1", threadContext.getResponseHeaders().get("resp").get(1));
//threadContext.putTransient("PERF_STATS_NODE1", "cde");
try (ThreadContext.StoredContext ctx = threadContext.stashContext()) {
assertNull(threadContext.getHeader("foo"));
assertNull(threadContext.getHeader("PERF"));
assertNull(threadContext.getTransient("ctx.foo"));
assertNotNull(threadContext.getTransient("PERF_STATS_NODE1"));
assertNull(threadContext.getResponseHeaders().get("resp"));
assertEquals("1", threadContext.getHeader("default"));
}

Expand Down Expand Up @@ -463,9 +472,13 @@ public void testSerializeInDifferentContextNoDefaults() throws IOException {
threadContext.putHeader("foo", "bar");
threadContext.putTransient("ctx.foo", 1);

// This is part of propagators
threadContext.putTransient("PERF_STATS", "abc");

assertEquals("bar", threadContext.getHeader("foo"));
assertNotNull(threadContext.getTransient("ctx.foo"));
assertNull(threadContext.getHeader("default"));
//threadContext.stashContext();
threadContext.writeTo(out);
}
{
Expand All @@ -475,10 +488,63 @@ public void testSerializeInDifferentContextNoDefaults() throws IOException {

assertEquals("bar", otherhreadContext.getHeader("foo"));
assertNull(otherhreadContext.getTransient("ctx.foo"));
assertNotNull(otherhreadContext.getHeader("PERF_STATS"));
assertEquals("5", otherhreadContext.getHeader("default"));
}
}

public void testSerializeInDifferentContextNoDefaults1() throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
{
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);

// This is part of propagators
threadContext.putTransient("PERF_STATS", "abc");
assertNotNull(threadContext.getTransient("PERF_STATS"));
threadContext.writeTo(out);
}
{
Settings otherSettings = Settings.builder().put("request.headers.default", "5").build();
ThreadContext otherhreadContext = new ThreadContext(otherSettings);
otherhreadContext.readHeaders(out.bytes().streamInput());

// Here its not null - as the transient headers get propagated as part of request headers
// during serialization
assertNotNull(otherhreadContext.getHeader("PERF_STATS"));
}
}

public void testSerializeInDifferentContextNoDefaultsStash() throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
{
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);

// This is part of propagators
threadContext.putTransient("PERF_STATS", "abc");
threadContext.putTransient("random", "cde");
assertNotNull(threadContext.getTransient("PERF_STATS"));
threadContext.stashContext();

// After stash perf stats is not cleared up since its part of propagators
assertNotNull(threadContext.getTransient("PERF_STATS"));

// This is cleared since its not part of propagators
assertNull(threadContext.getTransient("random"));

// serializing the threadcontext
threadContext.writeTo(out);
}
{
Settings otherSettings = Settings.builder().put("request.headers.default", "5").build();
ThreadContext otherhreadContext = new ThreadContext(otherSettings);
otherhreadContext.readHeaders(out.bytes().streamInput());

// Here its not null - as the transient headers get propagated as part of request headers
// during serialization
assertNotNull(otherhreadContext.getHeader("PERF_STATS"));
}
}

public void testCanResetDefault() {
Settings build = Settings.builder().put("request.headers.default", "1").build();
ThreadContext threadContext = new ThreadContext(build);
Expand Down

0 comments on commit a891884

Please sign in to comment.