Skip to content

Commit

Permalink
fix(interactive): Introduce MetricsTool to profile memory usage, pe…
Browse files Browse the repository at this point in the history
…nding tasks and qps (#4332)

<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?
the `MetricsTool` will print the following metrics periodically (setting
of `metrics.tool.interval.ms`, default is 5 mins):
1. `memory.usage`: memory usage of jvm and direct memory
2. `rpc.channels.executor.queue`: pending tasks in each grpc (netty)
channel
3. `gremlin.executor.queue`: pending tasks in gremlin executor
4. `gremlin.qps`: gremlin qps

These metrics will be logged in a separate file, configured by
`PerfMetricLog` in
[logback.xml](https://github.com/shirly121/GraphScope/blob/ir_metric_tool/interactive_engine/assembly/src/conf/groot/logback.xml#L37)


<!-- Please give a short brief about these changes. -->

## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

Fixes
  • Loading branch information
shirly121 authored Dec 3, 2024
1 parent 1d118e9 commit 1199f20
Show file tree
Hide file tree
Showing 28 changed files with 761 additions and 75 deletions.
17 changes: 17 additions & 0 deletions interactive_engine/assembly/src/conf/groot/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,23 @@
</encoder>
</appender>

<appender name="PerfMetricLog" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log_dir}/perf_metric.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log_dir}/perf_metric.%d{yyyy-MM-dd}.%i.gz</fileNamePattern>
<maxHistory>7</maxHistory>
<maxFileSize>100MB</maxFileSize>
<totalSizeCap>500MB</totalSizeCap>
</rollingPolicy>
<encoder>
<pattern>[%d{ISO8601}][%p][%t][%c:%L] %m%n</pattern>
</encoder>
</appender>

<Logger name="PerfMetricLog" level="INFO" additivity="false">
<appender-ref ref="PerfMetricLog"/>
</Logger>

<logger name="org.apache.zookeeper" level="ERROR" />
<logger name="org.apache.kafka" level="ERROR" />
<logger name="kafka" level="ERROR" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import com.alibaba.graphscope.common.ir.tools.QueryCache;
import com.alibaba.graphscope.common.ir.tools.QueryIdGenerator;
import com.alibaba.graphscope.common.manager.IrMetaQueryCallback;
import com.alibaba.graphscope.common.metric.MemoryMetric;
import com.alibaba.graphscope.common.metric.MetricsTool;
import com.alibaba.graphscope.cypher.service.CypherBootstrapper;
import com.alibaba.graphscope.gremlin.integration.result.GraphProperties;
import com.alibaba.graphscope.gremlin.integration.result.TestGraphFactory;
Expand Down Expand Up @@ -62,6 +64,7 @@ public class GraphServer {
private final IrMetaQueryCallback metaQueryCallback;
private final GraphProperties testGraph;
private final GraphRelOptimizer optimizer;
private final MetricsTool metricsTool;

private IrGremlinServer gremlinServer;
private CypherBootstrapper cypherBootstrapper;
Expand All @@ -77,10 +80,13 @@ public GraphServer(
this.metaQueryCallback = metaQueryCallback;
this.testGraph = testGraph;
this.optimizer = optimizer;
this.metricsTool = new MetricsTool(configs);
this.metricsTool.registerMetric(new MemoryMetric());
}

public void start() throws Exception {
ExecutionClient executionClient = ExecutionClient.Factory.create(configs, channelFetcher);
ExecutionClient executionClient =
ExecutionClient.Factory.create(configs, channelFetcher, metricsTool);
QueryIdGenerator idGenerator = new QueryIdGenerator(configs);
QueryCache queryCache = new QueryCache(configs);
if (!FrontendConfig.GREMLIN_SERVER_DISABLED.get(configs)) {
Expand All @@ -95,7 +101,8 @@ public void start() throws Exception {
executionClient,
channelFetcher,
metaQueryCallback,
testGraph);
testGraph,
metricsTool);
this.gremlinServer.start();
}
if (!FrontendConfig.NEO4J_BOLT_SERVER_DISABLED.get(configs)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.graphscope.common.client.type.ExecutionResponseListener;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;
import com.alibaba.graphscope.common.metric.MetricsTool;
import com.alibaba.graphscope.gremlin.plugin.QueryLogger;

/**
Expand All @@ -45,10 +46,11 @@ public abstract void submit(
public abstract void close() throws Exception;

public static class Factory {
public static ExecutionClient create(Configs configs, ChannelFetcher channelFetcher) {
public static ExecutionClient create(
Configs configs, ChannelFetcher channelFetcher, MetricsTool metricsTool) {
switch (channelFetcher.getType()) {
case RPC:
return new RpcExecutionClient(configs, channelFetcher);
return new RpcExecutionClient(configs, channelFetcher, metricsTool);
case HTTP:
return new HttpExecutionClient(configs, channelFetcher);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,41 @@
package com.alibaba.graphscope.common.client;

import com.alibaba.graphscope.common.client.channel.ChannelFetcher;
import com.alibaba.graphscope.common.client.metric.RpcExecutorMetric;
import com.alibaba.graphscope.common.client.type.ExecutionRequest;
import com.alibaba.graphscope.common.client.type.ExecutionResponseListener;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.PegasusConfig;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;
import com.alibaba.graphscope.common.metric.MetricsTool;
import com.alibaba.graphscope.gaia.proto.IrResult;
import com.alibaba.graphscope.gremlin.plugin.QueryLogger;
import com.alibaba.pegasus.RpcChannel;
import com.alibaba.pegasus.RpcClient;
import com.alibaba.pegasus.intf.ResultProcessor;
import com.alibaba.pegasus.service.protocol.PegasusClient;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;

import io.grpc.ClientInterceptors;
import io.grpc.Status;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicReference;
import java.util.List;
import java.util.stream.Collectors;

/**
* rpc client to send request to pegasus engine service
*/
public class RpcExecutionClient extends ExecutionClient<RpcChannel> {
Logger logger = LoggerFactory.getLogger(RpcExecutionClient.class);
private final Configs graphConfig;
private final AtomicReference<RpcClient> rpcClientRef;

public RpcExecutionClient(Configs graphConfig, ChannelFetcher<RpcChannel> channelFetcher) {
public RpcExecutionClient(
Configs graphConfig,
ChannelFetcher<RpcChannel> channelFetcher,
MetricsTool metricsTool) {
super(channelFetcher);
this.graphConfig = graphConfig;
this.rpcClientRef = new AtomicReference<>();
metricsTool.registerMetric(new RpcExecutorMetric(channelFetcher));
}

@Override
Expand All @@ -58,10 +61,18 @@ public void submit(
QueryTimeoutConfig timeoutConfig,
QueryLogger queryLogger)
throws Exception {
if (rpcClientRef.get() == null) {
rpcClientRef.compareAndSet(null, new RpcClient(channelFetcher.fetch()));
}
RpcClient rpcClient = rpcClientRef.get();
List<RpcChannel> interceptChannels =
channelFetcher.fetch().stream()
.map(
k ->
new RpcChannel(
ClientInterceptors.intercept(
k.getChannel(), new RpcInterceptor())))
.collect(Collectors.toList());
RpcClient rpcClient =
new RpcClient(
interceptChannels,
ImmutableMap.of(RpcInterceptor.QUERY_LOGGER_OPTION, queryLogger));
PegasusClient.JobRequest jobRequest =
PegasusClient.JobRequest.newBuilder()
.setPlan(
Expand Down Expand Up @@ -99,7 +110,8 @@ public void process(PegasusClient.JobResponse jobResponse) {
@Override
public void finish() {
listener.onCompleted();
queryLogger.info("[compile]: received results from engine");
queryLogger.info(
"[query][response]: received all responses from all servers");
}

@Override
Expand All @@ -113,8 +125,17 @@ public void error(Status status) {

@Override
public void close() throws Exception {
if (rpcClientRef.get() != null) {
rpcClientRef.get().shutdown();
}
channelFetcher
.fetch()
.forEach(
k -> {
try {
if (k != null) {
k.shutdown();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
*
* * Copyright 2020 Alibaba Group Holding Limited.
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package com.alibaba.graphscope.common.client;

import com.alibaba.graphscope.gremlin.plugin.QueryLogger;

import io.grpc.*;

import java.time.Instant;
import java.util.concurrent.atomic.AtomicBoolean;

public class RpcInterceptor implements ClientInterceptor {
public static final CallOptions.Key<QueryLogger> QUERY_LOGGER_OPTION =
CallOptions.Key.create("query-logger");

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions callOptions,
Channel channel) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
channel.newCall(methodDescriptor, callOptions)) {
private Instant requestStartTime;

@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
requestStartTime = Instant.now();
QueryLogger queryLogger = callOptions.getOption(QUERY_LOGGER_OPTION);
super.start(
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
responseListener) {
private final AtomicBoolean firstResponseLogged =
new AtomicBoolean(false);

@Override
public void onMessage(RespT message) {
if (firstResponseLogged.compareAndSet(false, true)) {
long firstResponseTime =
Instant.now().toEpochMilli()
- requestStartTime.toEpochMilli();
if (queryLogger != null) {
queryLogger.info(
"[query][response]: receive the first response from"
+ " the channel {} in {} ms",
channel.authority(),
firstResponseTime);
}
}
super.onMessage(message);
}

@Override
public void onClose(Status status, Metadata trailers) {
long endTime = Instant.now().toEpochMilli();
long totalTime = endTime - requestStartTime.toEpochMilli();
if (queryLogger != null) {
queryLogger.info(
"[query][response]: receive the last response from the"
+ " channel {} with status {} in {} ms",
channel.authority(),
status,
totalTime);
}
super.onClose(status, trailers);
}
},
headers);
if (queryLogger != null) {
queryLogger.info(
"[query][submitted]: submit the query to the task queue of channel {}",
channel.authority());
}
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.HiactorConfig;
import com.alibaba.graphscope.common.config.Utils;

import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -29,19 +30,18 @@
*/
public class HostURIChannelFetcher implements ChannelFetcher<URI> {
private static final String schema = "http";
private Configs graphConfig;
private final List<URI> uriChannels;

public HostURIChannelFetcher(Configs graphConfig) {
this.graphConfig = graphConfig;
this.uriChannels =
Utils.convertDotString(HiactorConfig.HIACTOR_HOSTS.get(graphConfig)).stream()
.map(k -> URI.create(schema + "://" + k))
.collect(Collectors.toList());
}

@Override
public List<URI> fetch() {
String hosts = HiactorConfig.HIACTOR_HOSTS.get(graphConfig);
String[] hostsArr = hosts.split(",");
return Arrays.asList(hostsArr).stream()
.map(k -> URI.create(schema + "://" + k))
.collect(Collectors.toList());
return Collections.unmodifiableList(uriChannels);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,30 @@
import com.alibaba.graphscope.common.config.Utils;
import com.alibaba.pegasus.RpcChannel;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

/**
* rpc implementation of {@link ChannelFetcher}, init rpc from local config
*/
public class HostsRpcChannelFetcher implements ChannelFetcher<RpcChannel> {
private Configs config;
private final List<RpcChannel> rpcChannels;

public HostsRpcChannelFetcher(Configs config) {
this.config = config;
this.rpcChannels =
Utils.convertDotString(PegasusConfig.PEGASUS_HOSTS.get(config)).stream()
.map(
k -> {
String[] host = k.split(":");
return new RpcChannel(host[0], Integer.valueOf(host[1]));
})
.collect(Collectors.toList());
}

@Override
public List<RpcChannel> fetch() {
List<String> hostAddresses =
Utils.convertDotString(PegasusConfig.PEGASUS_HOSTS.get(config));
List<RpcChannel> rpcChannels = new ArrayList<>();
hostAddresses.forEach(
k -> {
String[] host = k.split(":");
rpcChannels.add(new RpcChannel(host[0], Integer.valueOf(host[1])));
});
return rpcChannels;
return Collections.unmodifiableList(rpcChannels);
}

@Override
Expand Down
Loading

0 comments on commit 1199f20

Please sign in to comment.