diff --git a/interactive_engine/assembly/src/conf/groot/logback.xml b/interactive_engine/assembly/src/conf/groot/logback.xml
index 93b56c971b09..4eb219fa22ee 100644
--- a/interactive_engine/assembly/src/conf/groot/logback.xml
+++ b/interactive_engine/assembly/src/conf/groot/logback.xml
@@ -34,6 +34,23 @@
+
+ ${log_dir}/perf_metric.log
+
+ ${log_dir}/perf_metric.%d{yyyy-MM-dd}.%i.gz
+ 7
+ 100MB
+ 500MB
+
+
+ [%d{ISO8601}][%p][%t][%c:%L] %m%n
+
+
+
+
+
+
+
diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/GraphServer.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/GraphServer.java
index 4ac59f7a7ad9..ab11c9b198b2 100644
--- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/GraphServer.java
+++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/GraphServer.java
@@ -35,6 +35,8 @@
import com.alibaba.graphscope.common.ir.tools.QueryCache;
import com.alibaba.graphscope.common.ir.tools.QueryIdGenerator;
import com.alibaba.graphscope.common.manager.IrMetaQueryCallback;
+import com.alibaba.graphscope.common.metric.MemoryMetric;
+import com.alibaba.graphscope.common.metric.MetricsTool;
import com.alibaba.graphscope.cypher.service.CypherBootstrapper;
import com.alibaba.graphscope.gremlin.integration.result.GraphProperties;
import com.alibaba.graphscope.gremlin.integration.result.TestGraphFactory;
@@ -62,6 +64,7 @@ public class GraphServer {
private final IrMetaQueryCallback metaQueryCallback;
private final GraphProperties testGraph;
private final GraphRelOptimizer optimizer;
+ private final MetricsTool metricsTool;
private IrGremlinServer gremlinServer;
private CypherBootstrapper cypherBootstrapper;
@@ -77,10 +80,13 @@ public GraphServer(
this.metaQueryCallback = metaQueryCallback;
this.testGraph = testGraph;
this.optimizer = optimizer;
+ this.metricsTool = new MetricsTool(configs);
+ this.metricsTool.registerMetric(new MemoryMetric());
}
public void start() throws Exception {
- ExecutionClient executionClient = ExecutionClient.Factory.create(configs, channelFetcher);
+ ExecutionClient executionClient =
+ ExecutionClient.Factory.create(configs, channelFetcher, metricsTool);
QueryIdGenerator idGenerator = new QueryIdGenerator(configs);
QueryCache queryCache = new QueryCache(configs);
if (!FrontendConfig.GREMLIN_SERVER_DISABLED.get(configs)) {
@@ -95,7 +101,8 @@ public void start() throws Exception {
executionClient,
channelFetcher,
metaQueryCallback,
- testGraph);
+ testGraph,
+ metricsTool);
this.gremlinServer.start();
}
if (!FrontendConfig.NEO4J_BOLT_SERVER_DISABLED.get(configs)) {
diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java
index 089726d7fe0f..9cf88effecaf 100644
--- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java
+++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java
@@ -21,6 +21,7 @@
import com.alibaba.graphscope.common.client.type.ExecutionResponseListener;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;
+import com.alibaba.graphscope.common.metric.MetricsTool;
import com.alibaba.graphscope.gremlin.plugin.QueryLogger;
/**
@@ -45,10 +46,11 @@ public abstract void submit(
public abstract void close() throws Exception;
public static class Factory {
- public static ExecutionClient create(Configs configs, ChannelFetcher channelFetcher) {
+ public static ExecutionClient create(
+ Configs configs, ChannelFetcher channelFetcher, MetricsTool metricsTool) {
switch (channelFetcher.getType()) {
case RPC:
- return new RpcExecutionClient(configs, channelFetcher);
+ return new RpcExecutionClient(configs, channelFetcher, metricsTool);
case HTTP:
return new HttpExecutionClient(configs, channelFetcher);
default:
diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java
index 1ef68e17a1a0..ca84658a71d2 100644
--- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java
+++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java
@@ -17,38 +17,41 @@
package com.alibaba.graphscope.common.client;
import com.alibaba.graphscope.common.client.channel.ChannelFetcher;
+import com.alibaba.graphscope.common.client.metric.RpcExecutorMetric;
import com.alibaba.graphscope.common.client.type.ExecutionRequest;
import com.alibaba.graphscope.common.client.type.ExecutionResponseListener;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.PegasusConfig;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;
+import com.alibaba.graphscope.common.metric.MetricsTool;
import com.alibaba.graphscope.gaia.proto.IrResult;
import com.alibaba.graphscope.gremlin.plugin.QueryLogger;
import com.alibaba.pegasus.RpcChannel;
import com.alibaba.pegasus.RpcClient;
import com.alibaba.pegasus.intf.ResultProcessor;
import com.alibaba.pegasus.service.protocol.PegasusClient;
+import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
+import io.grpc.ClientInterceptors;
import io.grpc.Status;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.List;
+import java.util.stream.Collectors;
/**
* rpc client to send request to pegasus engine service
*/
public class RpcExecutionClient extends ExecutionClient {
- Logger logger = LoggerFactory.getLogger(RpcExecutionClient.class);
private final Configs graphConfig;
- private final AtomicReference rpcClientRef;
- public RpcExecutionClient(Configs graphConfig, ChannelFetcher channelFetcher) {
+ public RpcExecutionClient(
+ Configs graphConfig,
+ ChannelFetcher channelFetcher,
+ MetricsTool metricsTool) {
super(channelFetcher);
this.graphConfig = graphConfig;
- this.rpcClientRef = new AtomicReference<>();
+ metricsTool.registerMetric(new RpcExecutorMetric(channelFetcher));
}
@Override
@@ -58,10 +61,18 @@ public void submit(
QueryTimeoutConfig timeoutConfig,
QueryLogger queryLogger)
throws Exception {
- if (rpcClientRef.get() == null) {
- rpcClientRef.compareAndSet(null, new RpcClient(channelFetcher.fetch()));
- }
- RpcClient rpcClient = rpcClientRef.get();
+ List interceptChannels =
+ channelFetcher.fetch().stream()
+ .map(
+ k ->
+ new RpcChannel(
+ ClientInterceptors.intercept(
+ k.getChannel(), new RpcInterceptor())))
+ .collect(Collectors.toList());
+ RpcClient rpcClient =
+ new RpcClient(
+ interceptChannels,
+ ImmutableMap.of(RpcInterceptor.QUERY_LOGGER_OPTION, queryLogger));
PegasusClient.JobRequest jobRequest =
PegasusClient.JobRequest.newBuilder()
.setPlan(
@@ -99,7 +110,8 @@ public void process(PegasusClient.JobResponse jobResponse) {
@Override
public void finish() {
listener.onCompleted();
- queryLogger.info("[compile]: received results from engine");
+ queryLogger.info(
+ "[query][response]: received all responses from all servers");
}
@Override
@@ -113,8 +125,17 @@ public void error(Status status) {
@Override
public void close() throws Exception {
- if (rpcClientRef.get() != null) {
- rpcClientRef.get().shutdown();
- }
+ channelFetcher
+ .fetch()
+ .forEach(
+ k -> {
+ try {
+ if (k != null) {
+ k.shutdown();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
}
}
diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcInterceptor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcInterceptor.java
new file mode 100644
index 000000000000..647a16a83d23
--- /dev/null
+++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcInterceptor.java
@@ -0,0 +1,92 @@
+/*
+ *
+ * * Copyright 2020 Alibaba Group Holding Limited.
+ * *
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package com.alibaba.graphscope.common.client;
+
+import com.alibaba.graphscope.gremlin.plugin.QueryLogger;
+
+import io.grpc.*;
+
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class RpcInterceptor implements ClientInterceptor {
+ public static final CallOptions.Key QUERY_LOGGER_OPTION =
+ CallOptions.Key.create("query-logger");
+
+ @Override
+ public ClientCall interceptCall(
+ MethodDescriptor methodDescriptor,
+ CallOptions callOptions,
+ Channel channel) {
+ return new ForwardingClientCall.SimpleForwardingClientCall(
+ channel.newCall(methodDescriptor, callOptions)) {
+ private Instant requestStartTime;
+
+ @Override
+ public void start(Listener responseListener, Metadata headers) {
+ requestStartTime = Instant.now();
+ QueryLogger queryLogger = callOptions.getOption(QUERY_LOGGER_OPTION);
+ super.start(
+ new ForwardingClientCallListener.SimpleForwardingClientCallListener(
+ responseListener) {
+ private final AtomicBoolean firstResponseLogged =
+ new AtomicBoolean(false);
+
+ @Override
+ public void onMessage(RespT message) {
+ if (firstResponseLogged.compareAndSet(false, true)) {
+ long firstResponseTime =
+ Instant.now().toEpochMilli()
+ - requestStartTime.toEpochMilli();
+ if (queryLogger != null) {
+ queryLogger.info(
+ "[query][response]: receive the first response from"
+ + " the channel {} in {} ms",
+ channel.authority(),
+ firstResponseTime);
+ }
+ }
+ super.onMessage(message);
+ }
+
+ @Override
+ public void onClose(Status status, Metadata trailers) {
+ long endTime = Instant.now().toEpochMilli();
+ long totalTime = endTime - requestStartTime.toEpochMilli();
+ if (queryLogger != null) {
+ queryLogger.info(
+ "[query][response]: receive the last response from the"
+ + " channel {} with status {} in {} ms",
+ channel.authority(),
+ status,
+ totalTime);
+ }
+ super.onClose(status, trailers);
+ }
+ },
+ headers);
+ if (queryLogger != null) {
+ queryLogger.info(
+ "[query][submitted]: submit the query to the task queue of channel {}",
+ channel.authority());
+ }
+ }
+ };
+ }
+}
diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostURIChannelFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostURIChannelFetcher.java
index 2e0830276b8f..f25495bc9528 100644
--- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostURIChannelFetcher.java
+++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostURIChannelFetcher.java
@@ -18,9 +18,10 @@
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.HiactorConfig;
+import com.alibaba.graphscope.common.config.Utils;
import java.net.URI;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -29,19 +30,18 @@
*/
public class HostURIChannelFetcher implements ChannelFetcher {
private static final String schema = "http";
- private Configs graphConfig;
+ private final List uriChannels;
public HostURIChannelFetcher(Configs graphConfig) {
- this.graphConfig = graphConfig;
+ this.uriChannels =
+ Utils.convertDotString(HiactorConfig.HIACTOR_HOSTS.get(graphConfig)).stream()
+ .map(k -> URI.create(schema + "://" + k))
+ .collect(Collectors.toList());
}
@Override
public List fetch() {
- String hosts = HiactorConfig.HIACTOR_HOSTS.get(graphConfig);
- String[] hostsArr = hosts.split(",");
- return Arrays.asList(hostsArr).stream()
- .map(k -> URI.create(schema + "://" + k))
- .collect(Collectors.toList());
+ return Collections.unmodifiableList(uriChannels);
}
@Override
diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostsRpcChannelFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostsRpcChannelFetcher.java
index 63f5544c6857..39f980288ceb 100644
--- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostsRpcChannelFetcher.java
+++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostsRpcChannelFetcher.java
@@ -21,30 +21,30 @@
import com.alibaba.graphscope.common.config.Utils;
import com.alibaba.pegasus.RpcChannel;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.stream.Collectors;
/**
* rpc implementation of {@link ChannelFetcher}, init rpc from local config
*/
public class HostsRpcChannelFetcher implements ChannelFetcher {
- private Configs config;
+ private final List rpcChannels;
public HostsRpcChannelFetcher(Configs config) {
- this.config = config;
+ this.rpcChannels =
+ Utils.convertDotString(PegasusConfig.PEGASUS_HOSTS.get(config)).stream()
+ .map(
+ k -> {
+ String[] host = k.split(":");
+ return new RpcChannel(host[0], Integer.valueOf(host[1]));
+ })
+ .collect(Collectors.toList());
}
@Override
public List fetch() {
- List hostAddresses =
- Utils.convertDotString(PegasusConfig.PEGASUS_HOSTS.get(config));
- List rpcChannels = new ArrayList<>();
- hostAddresses.forEach(
- k -> {
- String[] host = k.split(":");
- rpcChannels.add(new RpcChannel(host[0], Integer.valueOf(host[1])));
- });
- return rpcChannels;
+ return Collections.unmodifiableList(rpcChannels);
}
@Override
diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/metric/RpcExecutorMetric.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/metric/RpcExecutorMetric.java
new file mode 100644
index 000000000000..7c9beee321a8
--- /dev/null
+++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/metric/RpcExecutorMetric.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * * Copyright 2020 Alibaba Group Holding Limited.
+ * *
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package com.alibaba.graphscope.common.client.metric;
+
+import com.alibaba.graphscope.common.client.channel.ChannelFetcher;
+import com.alibaba.graphscope.common.metric.Metric;
+import com.alibaba.pegasus.RpcChannel;
+import com.google.common.collect.Maps;
+
+import io.grpc.ManagedChannel;
+import io.grpc.internal.RpcUtils;
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoop;
+import io.netty.util.concurrent.SingleThreadEventExecutor;
+
+import java.util.List;
+import java.util.Map;
+
+public class RpcExecutorMetric implements Metric