Skip to content

Commit

Permalink
fix(interactive): Support Statistics in Groot Storage (#3856)
Browse files Browse the repository at this point in the history
Fixes #3843
---------
Co-authored-by: siyuan0322 <[email protected]>
  • Loading branch information
BingqingLyu authored Jun 14, 2024
1 parent 4e757e0 commit aa60805
Show file tree
Hide file tree
Showing 49 changed files with 641 additions and 68 deletions.
4 changes: 2 additions & 2 deletions charts/gie-standalone/templates/frontend/statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ spec:
value: {{ .Values.frontendQueryPerSecondLimit | quote }}
- name: GREMLIN_SCRIPT_LANGUAGE_NAME
value: {{ .Values.gremlinScriptLanguageName | quote }}
- name: PHYSICAL_OPT_CONFIG
value: {{ .Values.physicalOptConfig}}
- name: GRAPH_PHYSICAL_OPT
value: {{ .Values.graphPhysicalOpt | quote }}
ports:
- name: gremlin
containerPort: {{ .Values.frontend.service.gremlinPort }}
Expand Down
9 changes: 9 additions & 0 deletions charts/graphscope-store/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ data:
gremlin.server.port=12312
## disable neo4j when launching groot server by default
neo4j.bolt.server.disabled=true

## GOpt config
graph.planner.is.on={{ .Values.graphPlannerIsOn }}
graph.planner.opt={{ .Values.graphPlannerOpt }}
graph.planner.rules={{ .Values.graphPlannerRules }}
graph.physical.opt={{ .Values.graphPhysicalOpt }}
gremlin.script.language.name={{ .Values.gremlinScriptLanguageName }}
query.execution.timeout.ms={{ .Values.queryExecutionTimeoutMs }}

log4rs.config=LOG4RS_CONFIG
## Auth config
Expand Down Expand Up @@ -88,6 +96,7 @@ data:
offsets.persist.interval.ms={{ .Values.offsetsPersistIntervalMs }}
file.meta.store.path={{ .Values.fileMetaStorePath }}
log.recycle.enable={{ .Values.logRecycleEnable }}
collect.statistics={{ .Values.collectStatistics }}
log.recycle.offset.reserve={{ .Values.logRecycleOffsetReserve }}

## Extra Config
Expand Down
11 changes: 11 additions & 0 deletions charts/graphscope-store/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,15 @@ zkBasePath: "/graphscope/groot"
## Frontend Config
# gremlinServerPort: 12312

## GOpt config
## To adopt a CBO planner, set graphPlannerOpt to CBO, set gremlinScriptLanguageName to antlr_gremlin_calcite, and set physicalOptConfig to proto.
graphPlannerIsOn: true
graphPlannerOpt: RBO
graphPlannerRules: FilterIntoJoinRule, FilterMatchRule, ExtendIntersectRule, ExpandGetVFusionRule
gremlinScriptLanguageName: antlr_gremlin_traversal
physicalOptConfig: ffi
queryExecutionTimeoutMs: 600000

## Key-value pair separated by ;
## For example extraConfig="k1=v1;k2=v2"
extraConfig: ""
Expand Down Expand Up @@ -532,3 +541,5 @@ uptrace:

distributed:
enabled: false

collectStatistics: false
2 changes: 2 additions & 0 deletions interactive_engine/assembly/src/bin/groot/store_ctl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ _setup_env() {

mkdir -p ${LOG_DIR}

export OTEL_SDK_DISABLED="${OTEL_SDK_DISABLED:-true}"

export LD_LIBRARY_PATH=${GROOT_HOME}/native:${GROOT_HOME}/native/lib:${LD_LIBRARY_PATH}:/usr/local/lib
libpath="$(echo "${GROOT_HOME}"/lib/*.jar | tr ' ' ':')"
}
Expand Down
2 changes: 1 addition & 1 deletion interactive_engine/assembly/src/conf/groot/config.template
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ pegasus.worker.num=2
pegasus.hosts=localhost:8080

kafka.test.cluster.enable=true
OTEL_SDK_DISABLED=true
collect.statistics=false
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,7 @@ public class CommonConfig {
// Only available in multi pod mode.
public static final Config<Boolean> WRITE_HA_ENABLED =
Config.boolConfig("write.ha.enabled", false);

public static final Config<Boolean> COLLECT_STATISTICS =
Config.boolConfig("collect.statistics", false);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
public interface SchemaFetcher {
Map<Long, GraphSchema> getSchemaSnapshotPair();

GraphStatistics getStatistics();

int getPartitionNum();

int getVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import com.alibaba.graphscope.groot.common.schema.api.GraphStatistics;
import com.alibaba.graphscope.groot.common.schema.wrapper.EdgeKind;
import com.alibaba.graphscope.groot.common.schema.wrapper.LabelId;
import com.alibaba.graphscope.proto.groot.Statistics;
import com.google.common.collect.Maps;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -94,4 +96,32 @@ public Long getEdgeTypeCount(

return count == null ? 0L : count;
}

public static DefaultGraphStatistics parseProto(Statistics statistics) {
long vcount = statistics.getNumVertices();
long ecount = statistics.getNumEdges();
Map<LabelId, Long> vertexTypeCounts = new HashMap<>();
Map<EdgeKind, Long> edgeTypeCounts = new HashMap<>();
for (Statistics.VertexTypeStatistics sts : statistics.getVertexTypeStatisticsList()) {
vertexTypeCounts.put(LabelId.parseProto(sts.getLabelId()), sts.getNumVertices());
}
for (Statistics.EdgeTypeStatistics sts : statistics.getEdgeTypeStatisticsList()) {
edgeTypeCounts.put(EdgeKind.parseProto(sts.getEdgeKind()), sts.getNumEdges());
}
return new DefaultGraphStatistics(vertexTypeCounts, edgeTypeCounts, vcount, ecount);
}

@Override
public String toString() {
return "DefaultGraphStatistics{"
+ "vertexTypeCounts="
+ vertexTypeCounts
+ ", edgeTypeCounts="
+ edgeTypeCounts
+ ", totalVertexCount="
+ totalVertexCount
+ ", totalEdgeCount="
+ totalEdgeCount
+ '}';
}
}
2 changes: 1 addition & 1 deletion interactive_engine/compiler/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ run:
-Dgraph.store=${graph.store} \
-Dpegasus.hosts=${pegasus.hosts} \
-Dgremlin.script.language.name=${gremlin.script.language.name} \
-Dphysical.opt.config=${physical.opt.config} \
-Dgraph.physical.opt=${graph.physical.opt} \
-Dgraph.planner.rules=${graph.planner.rules} \
-Dgraph.planner.opt=${graph.planner.opt} \
-Dgraph.statistics=${graph.statistics} \
Expand Down
2 changes: 1 addition & 1 deletion interactive_engine/compiler/conf/ir.compiler.properties
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ calcite.default.charset: UTF-8
# gremlin.script.language.name: antlr_gremlin_traversal

# the output plan format, can be ffi(default) or proto
# physical.opt.config: ffi
# graph.physical.opt: ffi

# set the max capacity of the result streaming buffer for each query
# per.query.stream.buffer.max.capacity: 256
2 changes: 1 addition & 1 deletion interactive_engine/compiler/ir_experimental_advanced_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ RUST_LOG=info DATA_PATH=/tmp/gstest/ldbc_graph_exp_bin PARTITION_ID=0 ./start_rp
cd ${base_dir}/../executor/ir/target/release &&
RUST_LOG=info DATA_PATH=/tmp/gstest/ldbc_graph_exp_bin PARTITION_ID=1 ./start_rpc_server --config ${base_dir}/../executor/ir/integrated/config/distributed/server_1 &
sleep 10
cd ${base_dir} && make run graph.schema:=../executor/ir/core/resource/ldbc_schema.json gremlin.script.language.name=antlr_gremlin_calcite physical.opt.config=proto graph.planner.opt=CBO graph.statistics=src/main/resources/statistics/ldbc1_statistics.json pegasus.hosts:=127.0.0.1:1234,127.0.0.1:1235 graph.planner.rules=NotMatchToAntiJoinRule,FilterIntoJoinRule,FilterMatchRule,ExtendIntersectRule,ExpandGetVFusionRule &
cd ${base_dir} && make run graph.schema:=../executor/ir/core/resource/ldbc_schema.json gremlin.script.language.name=antlr_gremlin_calcite graph.physical.opt=proto graph.planner.opt=CBO graph.statistics=src/main/resources/statistics/ldbc1_statistics.json pegasus.hosts:=127.0.0.1:1234,127.0.0.1:1235 graph.planner.rules=NotMatchToAntiJoinRule,FilterIntoJoinRule,FilterMatchRule,ExtendIntersectRule,ExpandGetVFusionRule &
sleep 5s
cd ${base_dir} && make pattern_test && make ldbc_test && make simple_test
exit_code=$?
Expand Down
6 changes: 3 additions & 3 deletions interactive_engine/compiler/ir_experimental_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fi

# Test2: run gremlin standard tests on experimental store via calcite-based ir
# restart compiler service
cd ${base_dir} && make run gremlin.script.language.name=antlr_gremlin_calcite physical.opt.config=proto graph.planner.opt=CBO graph.statistics=./src/test/resources/statistics/modern_statistics.json &
cd ${base_dir} && make run gremlin.script.language.name=antlr_gremlin_calcite graph.physical.opt=proto graph.planner.opt=CBO graph.statistics=./src/test/resources/statistics/modern_statistics.json &
sleep 5s
# run gremlin standard tests to test calcite-based IR layer
cd ${base_dir} && make gremlin_calcite_test
Expand All @@ -42,7 +42,7 @@ RUST_LOG=info DATA_PATH=/tmp/gstest/modern_graph_exp_bin PARTITION_ID=0 ./start_
cd ${base_dir}/../executor/ir/target/release &&
RUST_LOG=info DATA_PATH=/tmp/gstest/modern_graph_exp_bin PARTITION_ID=1 ./start_rpc_server --config ${base_dir}/../executor/ir/integrated/config/distributed/server_1 &
# start compiler service
cd ${base_dir} && make run gremlin.script.language.name=antlr_gremlin_calcite physical.opt.config=proto graph.planner.opt=CBO graph.statistics=./src/test/resources/statistics/modern_statistics.json pegasus.hosts:=127.0.0.1:1234,127.0.0.1:1235 &
cd ${base_dir} && make run gremlin.script.language.name=antlr_gremlin_calcite graph.physical.opt=proto graph.planner.opt=CBO graph.statistics=./src/test/resources/statistics/modern_statistics.json pegasus.hosts:=127.0.0.1:1234,127.0.0.1:1235 &
sleep 5s
cd ${base_dir} && make gremlin_calcite_test
exit_code=$?
Expand Down Expand Up @@ -76,7 +76,7 @@ fi

# Test5: run cypher movie tests on experimental store via calcite-based ir
# restart compiler service
cd ${base_dir} && make run graph.schema:=../executor/ir/core/resource/movie_schema.json graph.planner.opt=CBO graph.statistics:=./src/main/resources/statistics/movie_statistics.json physical.opt.config=proto graph.planner.rules=NotMatchToAntiJoinRule,FilterIntoJoinRule,FilterMatchRule,ExtendIntersectRule,ExpandGetVFusionRule &
cd ${base_dir} && make run graph.schema:=../executor/ir/core/resource/movie_schema.json graph.planner.opt=CBO graph.statistics:=./src/main/resources/statistics/movie_statistics.json graph.physical.opt=proto graph.planner.rules=NotMatchToAntiJoinRule,FilterIntoJoinRule,FilterMatchRule,ExtendIntersectRule,ExpandGetVFusionRule &
sleep 10s
export ENGINE_TYPE=pegasus
cd ${base_dir} && make cypher_test
Expand Down
4 changes: 2 additions & 2 deletions interactive_engine/compiler/set_properties.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ frontend_query_per_second_limit="frontend.query.per.second.limit: $FRONTEND_QUER

gremlin_script_language_name="gremlin.script.language.name: $GREMLIN_SCRIPT_LANGUAGE_NAME"

physical_opt_config="physical.opt.config: $PHYSICAL_OPT_CONFIG"
graph_physical_opt="graph.physical.opt: $GRAPH_PHYSICAL_OPT"

count=1;
while (($count<$SERVERSSIZE))
Expand All @@ -47,6 +47,6 @@ done

graph_schema="graph.schema: $GRAPH_SCHEMA"

properties="$worker_num\n$timeout\n$batch_size\n$output_capacity\n$hosts\n$server_num\n$graph_schema\n$gremlin_server_port\n$cypher_server_port\n$frontend_query_per_second_limit\n$gremlin_script_language_name\n$physical_opt_config"
properties="$worker_num\n$timeout\n$batch_size\n$output_capacity\n$hosts\n$server_num\n$graph_schema\n$gremlin_server_port\n$cypher_server_port\n$frontend_query_per_second_limit\n$gremlin_script_language_name\n$graph_physical_opt"

echo -e $properties > ./conf/ir.compiler.properties
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public class FrontendConfig {
public static final Config<String> GREMLIN_SCRIPT_LANGUAGE_NAME =
Config.stringConfig("gremlin.script.language.name", "antlr_gremlin_traversal");

public static final Config<String> PHYSICAL_OPT_CONFIG =
Config.stringConfig("physical.opt.config", "ffi");
public static final Config<String> GRAPH_PHYSICAL_OPT =
Config.stringConfig("graph.physical.opt", "ffi");

public static final Config<Integer> PER_QUERY_STREAM_BUFFER_MAX_CAPACITY =
Config.intConfig("per.query.stream.buffer.max.capacity", 256);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public PhysicalPlan planPhysical(LogicalPlan logicalPlan) {
if (logicalPlan.isReturnEmpty()) {
return PhysicalPlan.createEmpty();
} else if (logicalPlan.getRegularQuery() != null) {
String physicalOpt = FrontendConfig.PHYSICAL_OPT_CONFIG.get(graphConfig);
String physicalOpt = FrontendConfig.GRAPH_PHYSICAL_OPT.get(graphConfig);
if ("proto".equals(physicalOpt.toLowerCase())) {
logger.debug("physical type is proto");
try (GraphRelProtoPhysicalBuilder physicalBuilder =
Expand Down
23 changes: 23 additions & 0 deletions interactive_engine/executor/assembly/groot/src/store/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,29 @@ pub extern "C" fn writeBatch(
return ret;
}

#[no_mangle]
pub extern "C" fn getGraphStatistics(ptr: GraphHandle, snapshot_id: i64) -> Box<JnaResponse> {
trace!("getGraphStatistics");
unsafe {
let graph_store_ptr = &*(ptr as *const GraphStore);
match graph_store_ptr.get_graph_statistics_blob(snapshot_id) {
Ok(blob) => {
let mut response = JnaResponse::new_success();
if let Err(e) = response.data(blob) {
response.success(false);
let msg = format!("{:?}", e);
response.err_msg(&msg);
}
response
}
Err(e) => {
let msg = format!("{:?}", e);
JnaResponse::new_error(&msg)
}
}
}
}

fn do_write_batch<G: MultiVersionGraph>(
graph: &G, snapshot_id: SnapshotId, buf: &[u8],
) -> GraphResult<bool> {
Expand Down
74 changes: 73 additions & 1 deletion interactive_engine/executor/store/groot/src/db/api/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ use super::{GraphResult, PropertyId};
use crate::db::api::property::Value;
use crate::db::api::{EdgeKind, LabelId};
use crate::db::common::bytes::util::parse_pb;
use crate::db::proto::model::{EdgeTableIdEntry, GraphDefPb, VertexTableIdEntry};
use crate::db::proto::model::{
EdgeTableIdEntry, GraphDefPb, Statistics as StatisticsPb,
Statistics_EdgeTypeStatistics as EdgeTypeStatisticsPb,
Statistics_VertexTypeStatistics as VertexTypeStatisticsPb, VertexTableIdEntry,
};
use crate::db::proto::schema_common::{PropertyDefPb, TypeDefPb, TypeEnumPb};

#[derive(Default, Clone)]
Expand Down Expand Up @@ -388,6 +392,74 @@ impl PropDef {
}
}

#[derive(Default, Clone)]
pub struct GraphPartitionStatistics {
version: i64,
vertex_count: u64,
edge_count: u64,
vertex_type_count: HashMap<LabelId, u64>,
edge_type_count: HashMap<EdgeKind, u64>,
}

impl GraphPartitionStatistics {
pub fn new(
version: i64, vertex_count: u64, edge_count: u64, vertex_type_count: HashMap<LabelId, u64>,
edge_type_count: HashMap<EdgeKind, u64>,
) -> Self {
GraphPartitionStatistics { version, vertex_count, edge_count, vertex_type_count, edge_type_count }
}

pub fn get_version(&self) -> i64 {
self.version
}

pub fn get_vertex_count(&self) -> u64 {
self.vertex_count
}

pub fn get_edge_count(&self) -> u64 {
self.edge_count
}

pub fn get_vertex_type_count(&self, v_type: LabelId) -> u64 {
self.vertex_type_count
.get(&v_type)
.unwrap_or(&0)
.clone()
}

pub fn get_edge_type_count(&self, e_type: EdgeKind) -> u64 {
self.edge_type_count
.get(&e_type)
.unwrap_or(&0)
.clone()
}

pub fn to_proto(&self) -> GraphResult<StatisticsPb> {
let mut pb = StatisticsPb::new();
pb.set_snapshotId(self.version);
pb.set_numVertices(self.vertex_count as u64);
pb.set_numEdges(self.edge_count as u64);
for (label_id, count) in &self.vertex_type_count {
let mut vertex_type_statistics = VertexTypeStatisticsPb::new();
vertex_type_statistics
.mut_labelId()
.set_id(*label_id);
vertex_type_statistics.set_numVertices(*count);
pb.mut_vertexTypeStatistics()
.push(vertex_type_statistics);
}
for (edge_kind, count) in &self.edge_type_count {
let mut edge_type_statistics = EdgeTypeStatisticsPb::new();
edge_type_statistics.set_edgeKind(edge_kind.to_proto());
edge_type_statistics.set_numEdges(*count);
pb.mut_edgeTypeStatistics()
.push(edge_type_statistics);
}
Ok(pb)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit aa60805

Please sign in to comment.