diff --git a/charts/gie-standalone/templates/frontend/statefulset.yaml b/charts/gie-standalone/templates/frontend/statefulset.yaml index 5f3c0ce11c33..96c3d220bee5 100644 --- a/charts/gie-standalone/templates/frontend/statefulset.yaml +++ b/charts/gie-standalone/templates/frontend/statefulset.yaml @@ -127,8 +127,8 @@ spec: value: {{ .Values.frontendQueryPerSecondLimit | quote }} - name: GREMLIN_SCRIPT_LANGUAGE_NAME value: {{ .Values.gremlinScriptLanguageName | quote }} - - name: PHYSICAL_OPT_CONFIG - value: {{ .Values.physicalOptConfig}} + - name: GRAPH_PHYSICAL_OPT + value: {{ .Values.graphPhysicalOpt | quote }} ports: - name: gremlin containerPort: {{ .Values.frontend.service.gremlinPort }} diff --git a/charts/graphscope-store/templates/configmap.yaml b/charts/graphscope-store/templates/configmap.yaml index 6da23ad68138..d202ff52d74c 100644 --- a/charts/graphscope-store/templates/configmap.yaml +++ b/charts/graphscope-store/templates/configmap.yaml @@ -58,6 +58,14 @@ data: gremlin.server.port=12312 ## disable neo4j when launching groot server by default neo4j.bolt.server.disabled=true + + ## GOpt config + graph.planner.is.on={{ .Values.graphPlannerIsOn }} + graph.planner.opt={{ .Values.graphPlannerOpt }} + graph.planner.rules={{ .Values.graphPlannerRules }} + graph.physical.opt={{ .Values.graphPhysicalOpt }} + gremlin.script.language.name={{ .Values.gremlinScriptLanguageName }} + query.execution.timeout.ms={{ .Values.queryExecutionTimeoutMs }} log4rs.config=LOG4RS_CONFIG ## Auth config @@ -88,6 +96,7 @@ data: offsets.persist.interval.ms={{ .Values.offsetsPersistIntervalMs }} file.meta.store.path={{ .Values.fileMetaStorePath }} log.recycle.enable={{ .Values.logRecycleEnable }} + collect.statistics={{ .Values.collectStatistics }} log.recycle.offset.reserve={{ .Values.logRecycleOffsetReserve }} ## Extra Config diff --git a/charts/graphscope-store/values.yaml b/charts/graphscope-store/values.yaml index 5c0c07a7a0f5..635adc68b9ec 100644 --- a/charts/graphscope-store/values.yaml +++ b/charts/graphscope-store/values.yaml @@ -488,6 +488,15 @@ zkBasePath: "/graphscope/groot" ## Frontend Config # gremlinServerPort: 12312 +## GOpt config +## To adopt a CBO planner, set graphPlannerOpt to CBO, set gremlinScriptLanguageName to antlr_gremlin_calcite, and set physicalOptConfig to proto. +graphPlannerIsOn: true +graphPlannerOpt: RBO +graphPlannerRules: FilterIntoJoinRule, FilterMatchRule, ExtendIntersectRule, ExpandGetVFusionRule +gremlinScriptLanguageName: antlr_gremlin_traversal +physicalOptConfig: ffi +queryExecutionTimeoutMs: 600000 + ## Key-value pair separated by ; ## For example extraConfig="k1=v1;k2=v2" extraConfig: "" @@ -532,3 +541,5 @@ uptrace: distributed: enabled: false + +collectStatistics: false \ No newline at end of file diff --git a/interactive_engine/assembly/src/bin/groot/store_ctl.sh b/interactive_engine/assembly/src/bin/groot/store_ctl.sh index 7e96473398ca..655b00875dd5 100755 --- a/interactive_engine/assembly/src/bin/groot/store_ctl.sh +++ b/interactive_engine/assembly/src/bin/groot/store_ctl.sh @@ -45,6 +45,8 @@ _setup_env() { mkdir -p ${LOG_DIR} + export OTEL_SDK_DISABLED="${OTEL_SDK_DISABLED:-true}" + export LD_LIBRARY_PATH=${GROOT_HOME}/native:${GROOT_HOME}/native/lib:${LD_LIBRARY_PATH}:/usr/local/lib libpath="$(echo "${GROOT_HOME}"/lib/*.jar | tr ' ' ':')" } diff --git a/interactive_engine/assembly/src/conf/groot/config.template b/interactive_engine/assembly/src/conf/groot/config.template index 6fb14b964f85..492e93f28191 100644 --- a/interactive_engine/assembly/src/conf/groot/config.template +++ b/interactive_engine/assembly/src/conf/groot/config.template @@ -34,4 +34,4 @@ pegasus.worker.num=2 pegasus.hosts=localhost:8080 kafka.test.cluster.enable=true -OTEL_SDK_DISABLED=true +collect.statistics=false diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java index ce5ff432f524..a48f9a40fbef 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java @@ -76,4 +76,7 @@ public class CommonConfig { // Only available in multi pod mode. public static final Config WRITE_HA_ENABLED = Config.boolConfig("write.ha.enabled", false); + + public static final Config COLLECT_STATISTICS = + Config.boolConfig("collect.statistics", false); } diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/api/SchemaFetcher.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/api/SchemaFetcher.java index 44677a4196e5..df74a612cdde 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/api/SchemaFetcher.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/api/SchemaFetcher.java @@ -20,6 +20,8 @@ public interface SchemaFetcher { Map getSchemaSnapshotPair(); + GraphStatistics getStatistics(); + int getPartitionNum(); int getVersion(); diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/impl/DefaultGraphStatistics.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/impl/DefaultGraphStatistics.java index 1c029c0e3dfe..e873e6eb9182 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/impl/DefaultGraphStatistics.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/impl/DefaultGraphStatistics.java @@ -18,11 +18,13 @@ import com.alibaba.graphscope.groot.common.schema.api.GraphStatistics; import com.alibaba.graphscope.groot.common.schema.wrapper.EdgeKind; import com.alibaba.graphscope.groot.common.schema.wrapper.LabelId; +import com.alibaba.graphscope.proto.groot.Statistics; import com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -94,4 +96,32 @@ public Long getEdgeTypeCount( return count == null ? 0L : count; } + + public static DefaultGraphStatistics parseProto(Statistics statistics) { + long vcount = statistics.getNumVertices(); + long ecount = statistics.getNumEdges(); + Map vertexTypeCounts = new HashMap<>(); + Map edgeTypeCounts = new HashMap<>(); + for (Statistics.VertexTypeStatistics sts : statistics.getVertexTypeStatisticsList()) { + vertexTypeCounts.put(LabelId.parseProto(sts.getLabelId()), sts.getNumVertices()); + } + for (Statistics.EdgeTypeStatistics sts : statistics.getEdgeTypeStatisticsList()) { + edgeTypeCounts.put(EdgeKind.parseProto(sts.getEdgeKind()), sts.getNumEdges()); + } + return new DefaultGraphStatistics(vertexTypeCounts, edgeTypeCounts, vcount, ecount); + } + + @Override + public String toString() { + return "DefaultGraphStatistics{" + + "vertexTypeCounts=" + + vertexTypeCounts + + ", edgeTypeCounts=" + + edgeTypeCounts + + ", totalVertexCount=" + + totalVertexCount + + ", totalEdgeCount=" + + totalEdgeCount + + '}'; + } } diff --git a/interactive_engine/compiler/Makefile b/interactive_engine/compiler/Makefile index 03b6ccd9c5f4..b1a12978074d 100644 --- a/interactive_engine/compiler/Makefile +++ b/interactive_engine/compiler/Makefile @@ -59,7 +59,7 @@ run: -Dgraph.store=${graph.store} \ -Dpegasus.hosts=${pegasus.hosts} \ -Dgremlin.script.language.name=${gremlin.script.language.name} \ - -Dphysical.opt.config=${physical.opt.config} \ + -Dgraph.physical.opt=${graph.physical.opt} \ -Dgraph.planner.rules=${graph.planner.rules} \ -Dgraph.planner.opt=${graph.planner.opt} \ -Dgraph.statistics=${graph.statistics} \ diff --git a/interactive_engine/compiler/conf/ir.compiler.properties b/interactive_engine/compiler/conf/ir.compiler.properties index 813297331f5e..955ba0607a96 100644 --- a/interactive_engine/compiler/conf/ir.compiler.properties +++ b/interactive_engine/compiler/conf/ir.compiler.properties @@ -57,7 +57,7 @@ calcite.default.charset: UTF-8 # gremlin.script.language.name: antlr_gremlin_traversal # the output plan format, can be ffi(default) or proto -# physical.opt.config: ffi +# graph.physical.opt: ffi # set the max capacity of the result streaming buffer for each query # per.query.stream.buffer.max.capacity: 256 diff --git a/interactive_engine/compiler/ir_experimental_advanced_ci.sh b/interactive_engine/compiler/ir_experimental_advanced_ci.sh index 1804a2d4d430..dfae19bb7ff9 100755 --- a/interactive_engine/compiler/ir_experimental_advanced_ci.sh +++ b/interactive_engine/compiler/ir_experimental_advanced_ci.sh @@ -34,7 +34,7 @@ RUST_LOG=info DATA_PATH=/tmp/gstest/ldbc_graph_exp_bin PARTITION_ID=0 ./start_rp cd ${base_dir}/../executor/ir/target/release && RUST_LOG=info DATA_PATH=/tmp/gstest/ldbc_graph_exp_bin PARTITION_ID=1 ./start_rpc_server --config ${base_dir}/../executor/ir/integrated/config/distributed/server_1 & sleep 10 -cd ${base_dir} && make run graph.schema:=../executor/ir/core/resource/ldbc_schema.json gremlin.script.language.name=antlr_gremlin_calcite physical.opt.config=proto graph.planner.opt=CBO graph.statistics=src/main/resources/statistics/ldbc1_statistics.json pegasus.hosts:=127.0.0.1:1234,127.0.0.1:1235 graph.planner.rules=NotMatchToAntiJoinRule,FilterIntoJoinRule,FilterMatchRule,ExtendIntersectRule,ExpandGetVFusionRule & +cd ${base_dir} && make run graph.schema:=../executor/ir/core/resource/ldbc_schema.json gremlin.script.language.name=antlr_gremlin_calcite graph.physical.opt=proto graph.planner.opt=CBO graph.statistics=src/main/resources/statistics/ldbc1_statistics.json pegasus.hosts:=127.0.0.1:1234,127.0.0.1:1235 graph.planner.rules=NotMatchToAntiJoinRule,FilterIntoJoinRule,FilterMatchRule,ExtendIntersectRule,ExpandGetVFusionRule & sleep 5s cd ${base_dir} && make pattern_test && make ldbc_test && make simple_test exit_code=$? diff --git a/interactive_engine/compiler/ir_experimental_ci.sh b/interactive_engine/compiler/ir_experimental_ci.sh index 28f385c44ee0..22b9ac6617eb 100755 --- a/interactive_engine/compiler/ir_experimental_ci.sh +++ b/interactive_engine/compiler/ir_experimental_ci.sh @@ -21,7 +21,7 @@ fi # Test2: run gremlin standard tests on experimental store via calcite-based ir # restart compiler service -cd ${base_dir} && make run gremlin.script.language.name=antlr_gremlin_calcite physical.opt.config=proto graph.planner.opt=CBO graph.statistics=./src/test/resources/statistics/modern_statistics.json & +cd ${base_dir} && make run gremlin.script.language.name=antlr_gremlin_calcite graph.physical.opt=proto graph.planner.opt=CBO graph.statistics=./src/test/resources/statistics/modern_statistics.json & sleep 5s # run gremlin standard tests to test calcite-based IR layer cd ${base_dir} && make gremlin_calcite_test @@ -42,7 +42,7 @@ RUST_LOG=info DATA_PATH=/tmp/gstest/modern_graph_exp_bin PARTITION_ID=0 ./start_ cd ${base_dir}/../executor/ir/target/release && RUST_LOG=info DATA_PATH=/tmp/gstest/modern_graph_exp_bin PARTITION_ID=1 ./start_rpc_server --config ${base_dir}/../executor/ir/integrated/config/distributed/server_1 & # start compiler service -cd ${base_dir} && make run gremlin.script.language.name=antlr_gremlin_calcite physical.opt.config=proto graph.planner.opt=CBO graph.statistics=./src/test/resources/statistics/modern_statistics.json pegasus.hosts:=127.0.0.1:1234,127.0.0.1:1235 & +cd ${base_dir} && make run gremlin.script.language.name=antlr_gremlin_calcite graph.physical.opt=proto graph.planner.opt=CBO graph.statistics=./src/test/resources/statistics/modern_statistics.json pegasus.hosts:=127.0.0.1:1234,127.0.0.1:1235 & sleep 5s cd ${base_dir} && make gremlin_calcite_test exit_code=$? @@ -76,7 +76,7 @@ fi # Test5: run cypher movie tests on experimental store via calcite-based ir # restart compiler service -cd ${base_dir} && make run graph.schema:=../executor/ir/core/resource/movie_schema.json graph.planner.opt=CBO graph.statistics:=./src/main/resources/statistics/movie_statistics.json physical.opt.config=proto graph.planner.rules=NotMatchToAntiJoinRule,FilterIntoJoinRule,FilterMatchRule,ExtendIntersectRule,ExpandGetVFusionRule & +cd ${base_dir} && make run graph.schema:=../executor/ir/core/resource/movie_schema.json graph.planner.opt=CBO graph.statistics:=./src/main/resources/statistics/movie_statistics.json graph.physical.opt=proto graph.planner.rules=NotMatchToAntiJoinRule,FilterIntoJoinRule,FilterMatchRule,ExtendIntersectRule,ExpandGetVFusionRule & sleep 10s export ENGINE_TYPE=pegasus cd ${base_dir} && make cypher_test diff --git a/interactive_engine/compiler/set_properties.sh b/interactive_engine/compiler/set_properties.sh index 7ccecb159d6c..20fe83649343 100755 --- a/interactive_engine/compiler/set_properties.sh +++ b/interactive_engine/compiler/set_properties.sh @@ -34,7 +34,7 @@ frontend_query_per_second_limit="frontend.query.per.second.limit: $FRONTEND_QUER gremlin_script_language_name="gremlin.script.language.name: $GREMLIN_SCRIPT_LANGUAGE_NAME" -physical_opt_config="physical.opt.config: $PHYSICAL_OPT_CONFIG" +graph_physical_opt="graph.physical.opt: $GRAPH_PHYSICAL_OPT" count=1; while (($count<$SERVERSSIZE)) @@ -47,6 +47,6 @@ done graph_schema="graph.schema: $GRAPH_SCHEMA" -properties="$worker_num\n$timeout\n$batch_size\n$output_capacity\n$hosts\n$server_num\n$graph_schema\n$gremlin_server_port\n$cypher_server_port\n$frontend_query_per_second_limit\n$gremlin_script_language_name\n$physical_opt_config" +properties="$worker_num\n$timeout\n$batch_size\n$output_capacity\n$hosts\n$server_num\n$graph_schema\n$gremlin_server_port\n$cypher_server_port\n$frontend_query_per_second_limit\n$gremlin_script_language_name\n$graph_physical_opt" echo -e $properties > ./conf/ir.compiler.properties diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/FrontendConfig.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/FrontendConfig.java index b4add388a962..361cf259343e 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/FrontendConfig.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/FrontendConfig.java @@ -55,8 +55,8 @@ public class FrontendConfig { public static final Config GREMLIN_SCRIPT_LANGUAGE_NAME = Config.stringConfig("gremlin.script.language.name", "antlr_gremlin_traversal"); - public static final Config PHYSICAL_OPT_CONFIG = - Config.stringConfig("physical.opt.config", "ffi"); + public static final Config GRAPH_PHYSICAL_OPT = + Config.stringConfig("graph.physical.opt", "ffi"); public static final Config PER_QUERY_STREAM_BUFFER_MAX_CAPACITY = Config.intConfig("per.query.stream.buffer.max.capacity", 256); diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/GraphPlanner.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/GraphPlanner.java index 4e3f65d1b50a..b02324527e1c 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/GraphPlanner.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/GraphPlanner.java @@ -131,7 +131,7 @@ public PhysicalPlan planPhysical(LogicalPlan logicalPlan) { if (logicalPlan.isReturnEmpty()) { return PhysicalPlan.createEmpty(); } else if (logicalPlan.getRegularQuery() != null) { - String physicalOpt = FrontendConfig.PHYSICAL_OPT_CONFIG.get(graphConfig); + String physicalOpt = FrontendConfig.GRAPH_PHYSICAL_OPT.get(graphConfig); if ("proto".equals(physicalOpt.toLowerCase())) { logger.debug("physical type is proto"); try (GraphRelProtoPhysicalBuilder physicalBuilder = diff --git a/interactive_engine/executor/assembly/groot/src/store/graph.rs b/interactive_engine/executor/assembly/groot/src/store/graph.rs index a4b18e8f91cc..1c75e09872e6 100644 --- a/interactive_engine/executor/assembly/groot/src/store/graph.rs +++ b/interactive_engine/executor/assembly/groot/src/store/graph.rs @@ -127,6 +127,29 @@ pub extern "C" fn writeBatch( return ret; } +#[no_mangle] +pub extern "C" fn getGraphStatistics(ptr: GraphHandle, snapshot_id: i64) -> Box { + trace!("getGraphStatistics"); + unsafe { + let graph_store_ptr = &*(ptr as *const GraphStore); + match graph_store_ptr.get_graph_statistics_blob(snapshot_id) { + Ok(blob) => { + let mut response = JnaResponse::new_success(); + if let Err(e) = response.data(blob) { + response.success(false); + let msg = format!("{:?}", e); + response.err_msg(&msg); + } + response + } + Err(e) => { + let msg = format!("{:?}", e); + JnaResponse::new_error(&msg) + } + } + } +} + fn do_write_batch( graph: &G, snapshot_id: SnapshotId, buf: &[u8], ) -> GraphResult { diff --git a/interactive_engine/executor/store/groot/src/db/api/schema.rs b/interactive_engine/executor/store/groot/src/db/api/schema.rs index 81d1853cb8ac..42088d5a3948 100644 --- a/interactive_engine/executor/store/groot/src/db/api/schema.rs +++ b/interactive_engine/executor/store/groot/src/db/api/schema.rs @@ -9,7 +9,11 @@ use super::{GraphResult, PropertyId}; use crate::db::api::property::Value; use crate::db::api::{EdgeKind, LabelId}; use crate::db::common::bytes::util::parse_pb; -use crate::db::proto::model::{EdgeTableIdEntry, GraphDefPb, VertexTableIdEntry}; +use crate::db::proto::model::{ + EdgeTableIdEntry, GraphDefPb, Statistics as StatisticsPb, + Statistics_EdgeTypeStatistics as EdgeTypeStatisticsPb, + Statistics_VertexTypeStatistics as VertexTypeStatisticsPb, VertexTableIdEntry, +}; use crate::db::proto::schema_common::{PropertyDefPb, TypeDefPb, TypeEnumPb}; #[derive(Default, Clone)] @@ -388,6 +392,74 @@ impl PropDef { } } +#[derive(Default, Clone)] +pub struct GraphPartitionStatistics { + version: i64, + vertex_count: u64, + edge_count: u64, + vertex_type_count: HashMap, + edge_type_count: HashMap, +} + +impl GraphPartitionStatistics { + pub fn new( + version: i64, vertex_count: u64, edge_count: u64, vertex_type_count: HashMap, + edge_type_count: HashMap, + ) -> Self { + GraphPartitionStatistics { version, vertex_count, edge_count, vertex_type_count, edge_type_count } + } + + pub fn get_version(&self) -> i64 { + self.version + } + + pub fn get_vertex_count(&self) -> u64 { + self.vertex_count + } + + pub fn get_edge_count(&self) -> u64 { + self.edge_count + } + + pub fn get_vertex_type_count(&self, v_type: LabelId) -> u64 { + self.vertex_type_count + .get(&v_type) + .unwrap_or(&0) + .clone() + } + + pub fn get_edge_type_count(&self, e_type: EdgeKind) -> u64 { + self.edge_type_count + .get(&e_type) + .unwrap_or(&0) + .clone() + } + + pub fn to_proto(&self) -> GraphResult { + let mut pb = StatisticsPb::new(); + pb.set_snapshotId(self.version); + pb.set_numVertices(self.vertex_count as u64); + pb.set_numEdges(self.edge_count as u64); + for (label_id, count) in &self.vertex_type_count { + let mut vertex_type_statistics = VertexTypeStatisticsPb::new(); + vertex_type_statistics + .mut_labelId() + .set_id(*label_id); + vertex_type_statistics.set_numVertices(*count); + pb.mut_vertexTypeStatistics() + .push(vertex_type_statistics); + } + for (edge_kind, count) in &self.edge_type_count { + let mut edge_type_statistics = EdgeTypeStatisticsPb::new(); + edge_type_statistics.set_edgeKind(edge_kind.to_proto()); + edge_type_statistics.set_numEdges(*count); + pb.mut_edgeTypeStatistics() + .push(edge_type_statistics); + } + Ok(pb) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/interactive_engine/executor/store/groot/src/db/graph/store.rs b/interactive_engine/executor/store/groot/src/db/graph/store.rs index 5321cad5bdb4..b04962305259 100644 --- a/interactive_engine/executor/store/groot/src/db/graph/store.rs +++ b/interactive_engine/executor/store/groot/src/db/graph/store.rs @@ -23,7 +23,7 @@ use crate::db::api::GraphErrorCode::{InvalidData, TypeNotFound}; use crate::db::api::*; use crate::db::common::bytes::transform; use crate::db::graph::entity::{RocksEdgeImpl, RocksVertexImpl}; -use crate::db::graph::iter::{EdgeTypeScan, VertexTypeScan}; +use crate::db::graph::iter::{EdgeKindScan, EdgeTypeScan, VertexTypeScan}; use crate::db::graph::table_manager::Table; use crate::db::storage::rocksdb::{RocksDB, RocksDBBackupEngine}; use crate::db::storage::RawBytes; @@ -1076,6 +1076,71 @@ impl GraphStore { Err(err) => Err(err), } } + + pub fn get_graph_statistics_blob(&self, si: SnapshotId) -> GraphResult> { + let statistics = self.get_statistics(si)?; + let pb = statistics.to_proto()?; + pb.write_to_bytes() + .map_err(|e| GraphError::new(InvalidData, format!("{:?}", e))) + } + + pub fn get_statistics(&self, si: SnapshotId) -> GraphResult { + let vertex_labels_statistics = self.get_vertex_statistics(si)?; + let edge_labels_statistics = self.get_edge_statistics(si)?; + let vertex_count = vertex_labels_statistics.values().sum(); + let edge_count = edge_labels_statistics.values().sum(); + Ok(GraphPartitionStatistics::new( + si, + vertex_count, + edge_count, + vertex_labels_statistics, + edge_labels_statistics, + )) + } + + fn get_vertex_statistics(&self, si: SnapshotId) -> GraphResult> { + let guard = epoch::pin(); + let map = self.vertex_manager.get_map(&guard); + let map_ref = unsafe { map.deref() }; + let vertex_label_ids = map_ref + .keys() + .cloned() + .collect::>(); + let mut vertex_label_counts = HashMap::new(); + for label_id in vertex_label_ids { + let label_count = self + .scan_vertex(si, Some(label_id), None, None)? + .count(); + vertex_label_counts.insert(label_id, label_count as u64); + } + + Ok(vertex_label_counts) + } + + fn get_edge_statistics(&self, si: SnapshotId) -> GraphResult> { + let guard = epoch::pin(); + let inner = self.edge_manager.get_inner(&guard); + let edge_mgr = unsafe { inner.deref() }; + let edge_kinds = edge_mgr.get_edge_kinds(); + let mut edge_kind_counts = HashMap::new(); + for edge_kind in edge_kinds { + let edge_kind_info = self + .edge_manager + .get_edge_kind(si, &edge_kind)?; + let kind_iter = EdgeKindScan::new( + self.storage.clone(), + si, + edge_kind_info, + None, + EdgeDirection::Both, + false, + ) + .into_iter(); + let edge_count = kind_iter.count(); + edge_kind_counts.insert(edge_kind.clone(), edge_count as u64); + } + Ok(edge_kind_counts) + } } fn merge_updates<'a>(old: &mut HashMap>, updates: &'a dyn PropertyMap) { diff --git a/interactive_engine/executor/store/groot/src/db/graph/types/edge.rs b/interactive_engine/executor/store/groot/src/db/graph/types/edge.rs index b67e131deb7d..78f1c6a2e1ab 100644 --- a/interactive_engine/executor/store/groot/src/db/graph/types/edge.rs +++ b/interactive_engine/executor/store/groot/src/db/graph/types/edge.rs @@ -1,4 +1,5 @@ #![allow(dead_code)] +use std::collections::hash_map::Keys; use std::collections::hash_map::Values; use std::collections::HashMap; use std::sync::atomic::Ordering; @@ -526,4 +527,9 @@ impl EdgeManagerInner { debug!("EdgeManagerInner::get_all_edges"); self.info_map.values() } + + pub(crate) fn get_edge_kinds(&self) -> Keys>> { + debug!("EdgeManagerInner::get_all_edges"); + self.type_map.keys() + } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/FrontendSnapshotClient.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/FrontendSnapshotClient.java index 2524efbbd21a..67b9755b54e6 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/FrontendSnapshotClient.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/FrontendSnapshotClient.java @@ -17,9 +17,7 @@ import com.alibaba.graphscope.groot.common.schema.wrapper.GraphDef; import com.alibaba.graphscope.groot.rpc.RpcChannel; import com.alibaba.graphscope.groot.rpc.RpcClient; -import com.alibaba.graphscope.proto.groot.AdvanceQuerySnapshotRequest; -import com.alibaba.graphscope.proto.groot.AdvanceQuerySnapshotResponse; -import com.alibaba.graphscope.proto.groot.FrontendSnapshotGrpc; +import com.alibaba.graphscope.proto.groot.*; import io.grpc.ManagedChannel; import io.grpc.stub.StreamObserver; @@ -68,4 +66,22 @@ public void onError(Throwable throwable) { public void onCompleted() {} }); } + + public void syncStatistics(Statistics statistics) { + getStub() + .syncStatistics( + SyncStatisticsRequest.newBuilder().setStatistics(statistics).build(), + new StreamObserver() { + @Override + public void onNext(SyncStatisticsResponse value) {} + + @Override + public void onError(Throwable t) { + logger.error("Failed sync statistics to frontend", t); + } + + @Override + public void onCompleted() {} + }); + } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GraphDefFetcher.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GraphDefFetcher.java index 6eb880443f87..f54918a971a0 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GraphDefFetcher.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GraphDefFetcher.java @@ -13,18 +13,72 @@ */ package com.alibaba.graphscope.groot.coordinator; +import com.alibaba.graphscope.groot.CompletionCallback; import com.alibaba.graphscope.groot.common.schema.wrapper.GraphDef; import com.alibaba.graphscope.groot.rpc.RoleClients; +import com.alibaba.graphscope.proto.groot.FetchStatisticsResponse; +import com.alibaba.graphscope.proto.groot.Statistics; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; public class GraphDefFetcher { + private static final Logger logger = LoggerFactory.getLogger(GraphDefFetcher.class); - private RoleClients storeSchemaClients; + private final RoleClients storeSchemaClients; + int storeCount; - public GraphDefFetcher(RoleClients storeSchemaClients) { + public GraphDefFetcher(RoleClients storeSchemaClients, int storeCount) { this.storeSchemaClients = storeSchemaClients; + this.storeCount = storeCount; } public GraphDef fetchGraphDef() { return storeSchemaClients.getClient(0).fetchSchema(); } + + public Map fetchStatistics() { + Map statisticsMap = new ConcurrentHashMap<>(); + CountDownLatch countDownLatch = new CountDownLatch(storeCount); + + for (int i = 0; i < storeCount; ++i) { + storeSchemaClients + .getClient(i) + .fetchStatistics( + new CompletionCallback() { + @Override + public void onCompleted(FetchStatisticsResponse res) { + statisticsMap.putAll(res.getStatisticsMapMap()); + finish(null); + } + + @Override + public void onError(Throwable t) { + logger.error("failed to fetch statistics", t); + finish(t); + } + + private void finish(Throwable t) { + countDownLatch.countDown(); + } + }); + } + try { + countDownLatch.await(); + } catch (InterruptedException e) { + logger.error("fetch statistics has been interrupted", e); + } + if (statisticsMap.size() != storeCount) { + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + // Ignore + } + } + return statisticsMap; + } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java index 6586beaaa338..7a0e53b09330 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java @@ -14,6 +14,8 @@ package com.alibaba.graphscope.groot.coordinator; import com.alibaba.graphscope.groot.CompletionCallback; +import com.alibaba.graphscope.groot.common.config.CommonConfig; +import com.alibaba.graphscope.groot.common.config.Configs; import com.alibaba.graphscope.groot.common.exception.ServiceNotReadyException; import com.alibaba.graphscope.groot.common.schema.wrapper.GraphDef; import com.alibaba.graphscope.groot.common.util.ThreadFactoryUtils; @@ -21,15 +23,21 @@ import com.alibaba.graphscope.groot.operation.BatchId; import com.alibaba.graphscope.groot.operation.Operation; import com.alibaba.graphscope.groot.operation.OperationBatch; +import com.alibaba.graphscope.groot.rpc.RoleClients; import com.alibaba.graphscope.groot.schema.ddl.DdlExecutors; import com.alibaba.graphscope.groot.schema.ddl.DdlResult; import com.alibaba.graphscope.groot.schema.request.DdlRequestBatch; +import com.alibaba.graphscope.proto.groot.EdgeKindPb; +import com.alibaba.graphscope.proto.groot.LabelIdPb; +import com.alibaba.graphscope.proto.groot.Statistics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; @@ -42,26 +50,41 @@ public class SchemaManager { private final DdlExecutors ddlExecutors; private final GraphDefFetcher graphDefFetcher; + private final RoleClients frontendSnapshotClients; + + private final int frontendCount; + private final AtomicReference graphDefRef; + private final AtomicReference graphStatistics; private final int partitionCount; private volatile boolean ready = false; + private final boolean collectStatistics; private ExecutorService singleThreadExecutor; - private ScheduledExecutorService scheduler; + private ScheduledExecutorService syncSchemaScheduler; + + private ScheduledExecutorService fetchStatisticsScheduler; public SchemaManager( + Configs configs, SnapshotManager snapshotManager, DdlExecutors ddlExecutors, DdlWriter ddlWriter, MetaService metaService, - GraphDefFetcher graphDefFetcher) { + GraphDefFetcher graphDefFetcher, + RoleClients frontendSnapshotClients) { this.snapshotManager = snapshotManager; this.ddlExecutors = ddlExecutors; this.ddlWriter = ddlWriter; this.graphDefFetcher = graphDefFetcher; + this.frontendSnapshotClients = frontendSnapshotClients; this.graphDefRef = new AtomicReference<>(); this.partitionCount = metaService.getPartitionCount(); + this.graphStatistics = new AtomicReference<>(); + + this.frontendCount = CommonConfig.FRONTEND_NODE_COUNT.get(configs); + this.collectStatistics = CommonConfig.COLLECT_STATISTICS.get(configs); } public void start() { @@ -78,11 +101,92 @@ public void start() { recover(); logger.info(graphDefRef.get().toProto().toString()); - this.scheduler = + this.syncSchemaScheduler = Executors.newSingleThreadScheduledExecutor( ThreadFactoryUtils.daemonThreadFactoryWithLogExceptionHandler( "recover", logger)); - this.scheduler.scheduleWithFixedDelay(this::recover, 5, 2, TimeUnit.SECONDS); + this.syncSchemaScheduler.scheduleWithFixedDelay(this::recover, 5, 2, TimeUnit.SECONDS); + + if (this.collectStatistics) { + this.fetchStatisticsScheduler = + Executors.newSingleThreadScheduledExecutor( + ThreadFactoryUtils.daemonThreadFactoryWithLogExceptionHandler( + "fetch-statistics", logger)); + this.fetchStatisticsScheduler.scheduleWithFixedDelay( + this::syncStatistics, 5, 60, TimeUnit.MINUTES); + } + } + + private void syncStatistics() { + try { + Map statisticsMap = graphDefFetcher.fetchStatistics(); + Statistics statistics = aggregateStatistics(statisticsMap); + this.graphStatistics.set(statistics); + } catch (Exception e) { + logger.error("Fetch statistics failed", e); + } + sendStatisticsToFrontend(); + } + + private void sendStatisticsToFrontend() { + Statistics statistics = this.graphStatistics.get(); + if (statistics != null) { + for (int i = 0; i < frontendCount; ++i) { + try { + frontendSnapshotClients.getClient(i).syncStatistics(statistics); + logger.debug("Sent statistics to frontend#{}", i); + } catch (Exception e) { + logger.error("Failed to sync statistics to frontend", e); + } + } + } + } + + private Statistics aggregateStatistics(Map statisticsMap) { + Statistics.Builder builder = Statistics.newBuilder(); + long numVertices = 0; + long numEdges = 0; + Map vertexMap = new HashMap<>(); + Map edgeKindMap = new HashMap<>(); + + for (Statistics statistics : statisticsMap.values()) { + numVertices += statistics.getNumVertices(); + numEdges += statistics.getNumEdges(); + + for (Statistics.VertexTypeStatistics subStatistics : + statistics.getVertexTypeStatisticsList()) { + long count = subStatistics.getNumVertices(); + int labelId = subStatistics.getLabelId().getId(); + vertexMap.compute(labelId, (k, v) -> (v == null) ? count : v + count); + } + + for (Statistics.EdgeTypeStatistics subStatistics : + statistics.getEdgeTypeStatisticsList()) { + long count = subStatistics.getNumEdges(); + EdgeKindPb edgeKindPb = subStatistics.getEdgeKind(); + edgeKindMap.compute(edgeKindPb, (k, v) -> (v == null) ? count : v + count); + } + } + builder.setSnapshotId(0); // TODO(siyuan): set this + builder.setNumVertices(numVertices).setNumEdges(numEdges); + for (Map.Entry entry : vertexMap.entrySet()) { + int labelId = entry.getKey(); + LabelIdPb labelIdPb = LabelIdPb.newBuilder().setId(labelId).build(); + Long count = entry.getValue(); + builder.addVertexTypeStatistics( + Statistics.VertexTypeStatistics.newBuilder() + .setLabelId(labelIdPb) + .setNumVertices(count)); + } + for (Map.Entry entry : edgeKindMap.entrySet()) { + EdgeKindPb edgeKindPb = entry.getKey(); + Long count = entry.getValue(); + builder.addEdgeTypeStatistics( + Statistics.EdgeTypeStatistics.newBuilder() + .setEdgeKind(edgeKindPb) + .setNumEdges(count)); + } + return builder.build(); } private void recover() { @@ -180,7 +284,7 @@ public void submitBatchDdl( e); this.ready = false; callback.onError(e); - this.singleThreadExecutor.execute(() -> recover()); + this.singleThreadExecutor.execute(this::recover); } }); } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/StoreSchemaClient.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/StoreSchemaClient.java index f49edef51122..ff60ac210b71 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/StoreSchemaClient.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/StoreSchemaClient.java @@ -13,14 +13,14 @@ */ package com.alibaba.graphscope.groot.coordinator; +import com.alibaba.graphscope.groot.CompletionCallback; import com.alibaba.graphscope.groot.common.schema.wrapper.GraphDef; import com.alibaba.graphscope.groot.rpc.RpcChannel; import com.alibaba.graphscope.groot.rpc.RpcClient; -import com.alibaba.graphscope.proto.groot.FetchSchemaRequest; -import com.alibaba.graphscope.proto.groot.FetchSchemaResponse; -import com.alibaba.graphscope.proto.groot.StoreSchemaGrpc; +import com.alibaba.graphscope.proto.groot.*; import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; public class StoreSchemaClient extends RpcClient { public StoreSchemaClient(RpcChannel channel) { @@ -35,9 +35,36 @@ private StoreSchemaGrpc.StoreSchemaBlockingStub getStub() { return StoreSchemaGrpc.newBlockingStub(rpcChannel.getChannel()); } + private StoreSchemaGrpc.StoreSchemaStub getAsyncStub() { + return StoreSchemaGrpc.newStub(rpcChannel.getChannel()); + } + public GraphDef fetchSchema() { StoreSchemaGrpc.StoreSchemaBlockingStub stub = getStub(); FetchSchemaResponse response = stub.fetchSchema(FetchSchemaRequest.newBuilder().build()); return GraphDef.parseProto(response.getGraphDef()); } + + public void fetchStatistics(CompletionCallback callback) { + long snapshotId = Long.MAX_VALUE - 1; + FetchStatisticsRequest request = + FetchStatisticsRequest.newBuilder().setSnapshotId(snapshotId).build(); + getAsyncStub() + .fetchStatistics( + request, + new StreamObserver<>() { + @Override + public void onNext(FetchStatisticsResponse value) { + callback.onCompleted(value); + } + + @Override + public void onError(Throwable t) { + callback.onError(t); + } + + @Override + public void onCompleted() {} + }); + } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/backup/BackupManager.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/backup/BackupManager.java index d2b928f19695..f6210d967357 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/backup/BackupManager.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/backup/BackupManager.java @@ -14,7 +14,6 @@ package com.alibaba.graphscope.groot.coordinator.backup; import com.alibaba.graphscope.groot.CompletionCallback; -import com.alibaba.graphscope.groot.SnapshotCache; import com.alibaba.graphscope.groot.SnapshotWithSchema; import com.alibaba.graphscope.groot.common.config.BackupConfig; import com.alibaba.graphscope.groot.common.config.CommonConfig; @@ -26,6 +25,7 @@ import com.alibaba.graphscope.groot.coordinator.QuerySnapshotListener; import com.alibaba.graphscope.groot.coordinator.SchemaManager; import com.alibaba.graphscope.groot.coordinator.SnapshotManager; +import com.alibaba.graphscope.groot.frontend.SnapshotCache; import com.alibaba.graphscope.groot.meta.FileMetaStore; import com.alibaba.graphscope.groot.meta.MetaService; import com.alibaba.graphscope.groot.meta.MetaStore; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/backup/LocalSnapshotListener.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/backup/LocalSnapshotListener.java index d14de6dd9001..73e731619d8c 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/backup/LocalSnapshotListener.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/backup/LocalSnapshotListener.java @@ -13,10 +13,10 @@ */ package com.alibaba.graphscope.groot.coordinator.backup; -import com.alibaba.graphscope.groot.SnapshotCache; import com.alibaba.graphscope.groot.common.schema.wrapper.GraphDef; import com.alibaba.graphscope.groot.coordinator.QuerySnapshotListener; import com.alibaba.graphscope.groot.coordinator.SchemaManager; +import com.alibaba.graphscope.groot.frontend.SnapshotCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/BatchDdlClient.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/BatchDdlClient.java index 8d3ab2253310..d755fccdaf24 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/BatchDdlClient.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/BatchDdlClient.java @@ -1,6 +1,5 @@ package com.alibaba.graphscope.groot.frontend; -import com.alibaba.graphscope.groot.SnapshotCache; import com.alibaba.graphscope.groot.common.schema.wrapper.GraphDef; import com.alibaba.graphscope.groot.common.util.UuidUtils; import com.alibaba.graphscope.groot.rpc.RoleClients; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java index 204ed7dca4f3..f12d95746229 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java @@ -14,7 +14,6 @@ package com.alibaba.graphscope.groot.frontend; import com.alibaba.graphscope.groot.CompletionCallback; -import com.alibaba.graphscope.groot.SnapshotCache; import com.alibaba.graphscope.groot.common.schema.api.*; import com.alibaba.graphscope.groot.common.schema.mapper.GraphSchemaMapper; import com.alibaba.graphscope.groot.common.schema.unified.Graph; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/FrontendSnapshotService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/FrontendSnapshotService.java index ee08d48e06cf..4b0a5995b621 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/FrontendSnapshotService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/FrontendSnapshotService.java @@ -13,11 +13,10 @@ */ package com.alibaba.graphscope.groot.frontend; -import com.alibaba.graphscope.groot.SnapshotCache; +import com.alibaba.graphscope.groot.common.schema.api.GraphStatistics; +import com.alibaba.graphscope.groot.common.schema.impl.DefaultGraphStatistics; import com.alibaba.graphscope.groot.common.schema.wrapper.GraphDef; -import com.alibaba.graphscope.proto.groot.AdvanceQuerySnapshotRequest; -import com.alibaba.graphscope.proto.groot.AdvanceQuerySnapshotResponse; -import com.alibaba.graphscope.proto.groot.FrontendSnapshotGrpc; +import com.alibaba.graphscope.proto.groot.*; import io.grpc.stub.StreamObserver; @@ -53,4 +52,15 @@ public void advanceQuerySnapshot( observer.onError(e); } } + + @Override + public void syncStatistics( + SyncStatisticsRequest request, + StreamObserver responseObserver) { + Statistics statistics = request.getStatistics(); + GraphStatistics graphStatistics = DefaultGraphStatistics.parseProto(statistics); + snapshotCache.setGraphStatisticsRef(graphStatistics); + responseObserver.onNext(SyncStatisticsResponse.newBuilder().build()); + responseObserver.onCompleted(); + } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/GrootDdlService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/GrootDdlService.java index 73c08a928c74..188ed62ec5a3 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/GrootDdlService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/GrootDdlService.java @@ -13,7 +13,6 @@ */ package com.alibaba.graphscope.groot.frontend; -import com.alibaba.graphscope.groot.SnapshotCache; import com.alibaba.graphscope.groot.common.schema.wrapper.DataType; import com.alibaba.graphscope.groot.common.schema.wrapper.EdgeKind; import com.alibaba.graphscope.groot.common.schema.wrapper.GraphDef; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/SnapshotCache.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/SnapshotCache.java similarity index 90% rename from interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/SnapshotCache.java rename to interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/SnapshotCache.java index 365e860ba228..39b53be82dd1 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/SnapshotCache.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/SnapshotCache.java @@ -11,8 +11,11 @@ * express or implied. See the License for the specific language governing permissions and * limitations under the License. */ -package com.alibaba.graphscope.groot; +package com.alibaba.graphscope.groot.frontend; +import com.alibaba.graphscope.groot.SnapshotListener; +import com.alibaba.graphscope.groot.SnapshotWithSchema; +import com.alibaba.graphscope.groot.common.schema.api.GraphStatistics; import com.alibaba.graphscope.groot.common.schema.wrapper.GraphDef; import org.slf4j.Logger; @@ -32,11 +35,14 @@ public class SnapshotCache { private final AtomicReference snapshotWithSchemaRef; + private final AtomicReference graphStatisticsRef; + private final TreeMap> snapshotToListeners; public SnapshotCache() { SnapshotWithSchema snapshotWithSchema = SnapshotWithSchema.newBuilder().build(); snapshotWithSchemaRef = new AtomicReference<>(snapshotWithSchema); + graphStatisticsRef = new AtomicReference<>(); this.snapshotToListeners = new TreeMap<>(); } @@ -127,4 +133,12 @@ public synchronized long advanceQuerySnapshotId(long snapshotId, GraphDef graphD public SnapshotWithSchema getSnapshotWithSchema() { return this.snapshotWithSchemaRef.get(); } + + public void setGraphStatisticsRef(GraphStatistics statistics) { + this.graphStatisticsRef.set(statistics); + } + + public GraphStatistics getGraphStatistics() { + return this.graphStatisticsRef.get(); + } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/WrappedSchemaFetcher.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/WrappedSchemaFetcher.java index 32c8b7115bd1..646fbad49973 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/WrappedSchemaFetcher.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/WrappedSchemaFetcher.java @@ -13,9 +13,9 @@ */ package com.alibaba.graphscope.groot.frontend; -import com.alibaba.graphscope.groot.SnapshotCache; import com.alibaba.graphscope.groot.SnapshotWithSchema; import com.alibaba.graphscope.groot.common.schema.api.GraphSchema; +import com.alibaba.graphscope.groot.common.schema.api.GraphStatistics; import com.alibaba.graphscope.groot.common.schema.api.SchemaFetcher; import com.alibaba.graphscope.groot.meta.MetaService; @@ -49,6 +49,11 @@ public Map getSchemaSnapshotPair() { return Map.of(snapshotId, schema); } + @Override + public GraphStatistics getStatistics() { + return this.snapshotCache.getGraphStatistics(); + } + @Override public int getPartitionNum() { return this.metaService.getPartitionCount(); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java index 5ac0b5044230..2f69ad5b93b2 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java @@ -1,7 +1,6 @@ package com.alibaba.graphscope.groot.frontend.write; import com.alibaba.graphscope.groot.CompletionCallback; -import com.alibaba.graphscope.groot.SnapshotCache; import com.alibaba.graphscope.groot.common.config.Configs; import com.alibaba.graphscope.groot.common.exception.PropertyDefNotFoundException; import com.alibaba.graphscope.groot.common.schema.api.GraphElement; @@ -12,6 +11,7 @@ import com.alibaba.graphscope.groot.common.schema.wrapper.LabelId; import com.alibaba.graphscope.groot.common.schema.wrapper.PropertyValue; import com.alibaba.graphscope.groot.common.util.*; +import com.alibaba.graphscope.groot.frontend.SnapshotCache; import com.alibaba.graphscope.groot.operation.EdgeId; import com.alibaba.graphscope.groot.operation.OperationBatch; import com.alibaba.graphscope.groot.operation.OperationType; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/GraphPartition.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/GraphPartition.java index 67ca6179b73f..92078bbcb5a7 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/GraphPartition.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/GraphPartition.java @@ -17,6 +17,7 @@ import com.alibaba.graphscope.groot.store.backup.GraphPartitionBackup; import com.alibaba.graphscope.groot.store.external.ExternalStorage; import com.alibaba.graphscope.proto.groot.GraphDefPb; +import com.alibaba.graphscope.proto.groot.Statistics; import java.io.Closeable; import java.io.IOException; @@ -33,6 +34,8 @@ public interface GraphPartition extends Closeable { GraphDefPb getGraphDefBlob() throws IOException; + Statistics getGraphStatisticsBlob(long si) throws IOException; + void ingestExternalFile(ExternalStorage storage, String fullPath) throws IOException; GraphPartitionBackup openBackupEngine(); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreSchemaService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreSchemaService.java index 3f6e11c33e65..1f21cecbad05 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreSchemaService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreSchemaService.java @@ -13,10 +13,7 @@ */ package com.alibaba.graphscope.groot.store; -import com.alibaba.graphscope.proto.groot.FetchSchemaRequest; -import com.alibaba.graphscope.proto.groot.FetchSchemaResponse; -import com.alibaba.graphscope.proto.groot.GraphDefPb; -import com.alibaba.graphscope.proto.groot.StoreSchemaGrpc; +import com.alibaba.graphscope.proto.groot.*; import io.grpc.stub.StreamObserver; @@ -24,6 +21,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Map; public class StoreSchemaService extends StoreSchemaGrpc.StoreSchemaImplBase { private static final Logger logger = LoggerFactory.getLogger(StoreSchemaService.class); @@ -47,4 +45,22 @@ public void fetchSchema( responseObserver.onError(e); } } + + @Override + public void fetchStatistics( + FetchStatisticsRequest request, + StreamObserver responseObserver) { + try { + Map map = + this.storeService.getGraphStatisticsBlob(request.getSnapshotId()); + logger.debug("Collected statistics :{}", map.size()); + FetchStatisticsResponse response = + FetchStatisticsResponse.newBuilder().putAllStatisticsMap(map).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (IOException e) { + logger.error("get statistics failed", e); + responseObserver.onError(e); + } + } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java index 5a55fbc2832b..57f419af00c9 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java @@ -25,6 +25,7 @@ import com.alibaba.graphscope.groot.store.external.ExternalStorage; import com.alibaba.graphscope.groot.store.jna.JnaGraphStore; import com.alibaba.graphscope.proto.groot.GraphDefPb; +import com.alibaba.graphscope.proto.groot.Statistics; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.common.Attributes; @@ -69,6 +70,7 @@ public class StoreService { private ExecutorService ingestExecutor; private ExecutorService garbageCollectExecutor; private ExecutorService compactExecutor; + private ExecutorService statisticsExecutor; private ThreadPoolExecutor downloadExecutor; private final boolean enableGc; @@ -107,7 +109,7 @@ public void start() throws IOException { writeThreadCount, writeThreadCount, 0L, - TimeUnit.MILLISECONDS, + TimeUnit.SECONDS, new LinkedBlockingQueue<>(), ThreadFactoryUtils.daemonThreadFactoryWithLogExceptionHandler( "store-write", logger)); @@ -116,7 +118,7 @@ public void start() throws IOException { 1, 1, 0L, - TimeUnit.MILLISECONDS, + TimeUnit.SECONDS, new LinkedBlockingQueue<>(1), ThreadFactoryUtils.daemonThreadFactoryWithLogExceptionHandler( "store-ingest", logger)); @@ -134,7 +136,7 @@ public void start() throws IOException { 1, 1, 0L, - TimeUnit.MILLISECONDS, + TimeUnit.SECONDS, new LinkedBlockingQueue<>(), ThreadFactoryUtils.daemonThreadFactoryWithLogExceptionHandler( "store-garbage-collect", logger)); @@ -143,12 +145,21 @@ public void start() throws IOException { new ThreadPoolExecutor( 16, 16, - 1000L, - TimeUnit.MILLISECONDS, + 1L, + TimeUnit.SECONDS, new LinkedBlockingQueue<>(), ThreadFactoryUtils.daemonThreadFactoryWithLogExceptionHandler( "store-download", logger)); this.downloadExecutor.allowCoreThreadTimeOut(true); + this.statisticsExecutor = + new ThreadPoolExecutor( + 8, + 16, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + ThreadFactoryUtils.daemonThreadFactoryWithLogExceptionHandler( + "store-statistics", logger)); logger.info("StoreService started. storeId [" + this.storeId + "]"); } @@ -292,6 +303,45 @@ public GraphDefPb getGraphDefBlob() throws IOException { return graphPartition.getGraphDefBlob(); } + public Map getGraphStatisticsBlob(long snapshotId) throws IOException { + int partitionCount = this.idToPartition.values().size(); + CountDownLatch countDownLatch = new CountDownLatch(partitionCount); + logger.info("Collect statistics of store#{} started", storeId); + Map statisticsMap = new ConcurrentHashMap<>(); + for (Map.Entry entry : idToPartition.entrySet()) { + this.statisticsExecutor.execute( + () -> { + try { + Statistics statistics = + entry.getValue().getGraphStatisticsBlob(snapshotId); + statisticsMap.put(entry.getKey(), statistics); + logger.debug("Collected statistics of partition#{}", entry.getKey()); + } catch (IOException e) { + logger.error( + "Collect statistics failed for partition {}", + entry.getKey(), + e); + } finally { + countDownLatch.countDown(); + } + }); + } + try { + countDownLatch.await(); + } catch (InterruptedException e) { + logger.error("collect statistics has been interrupted", e); + } + if (statisticsMap.size() != partitionCount) { + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + // Ignore + } + } + logger.info("Collect statistics of store#{} done, size: {}", storeId, statisticsMap.size()); + return statisticsMap; + } + public MetaService getMetaService() { return this.metaService; } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/GraphLibrary.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/GraphLibrary.java index f4b0077eeff5..ac376cec366e 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/GraphLibrary.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/GraphLibrary.java @@ -28,6 +28,8 @@ public interface GraphLibrary extends Library { JnaResponse getGraphDefBlob(Pointer storePointer); + JnaResponse getGraphStatistics(Pointer storePointer, long snapshotId); + Pointer openGraphBackupEngine(Pointer storePointer, String backupPath); void closeGraphBackupEngine(Pointer bePointer); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/JnaGraphStore.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/JnaGraphStore.java index a6d66e29b7e8..35c173d32bc5 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/JnaGraphStore.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/JnaGraphStore.java @@ -21,6 +21,7 @@ import com.alibaba.graphscope.groot.store.backup.GraphPartitionBackup; import com.alibaba.graphscope.groot.store.external.ExternalStorage; import com.alibaba.graphscope.proto.groot.GraphDefPb; +import com.alibaba.graphscope.proto.groot.Statistics; import com.sun.jna.Pointer; import org.apache.commons.io.FileUtils; @@ -119,6 +120,17 @@ public GraphDefPb getGraphDefBlob() throws IOException { } } + @Override + public Statistics getGraphStatisticsBlob(long si) throws IOException { + try (JnaResponse jnaResponse = GraphLibrary.INSTANCE.getGraphStatistics(this.pointer, si)) { + if (!jnaResponse.success()) { + String errMsg = jnaResponse.getErrMsg(); + throw new IOException(errMsg); + } + return Statistics.parseFrom(jnaResponse.getData()); + } + } + @Override public void ingestExternalFile(ExternalStorage storage, String sstPath) throws IOException { String[] items = sstPath.split("/"); diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Coordinator.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Coordinator.java index 05351e292ede..0c9ee6b9d47c 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Coordinator.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Coordinator.java @@ -14,7 +14,6 @@ package com.alibaba.graphscope.groot.servers; import com.alibaba.graphscope.groot.CuratorUtils; -import com.alibaba.graphscope.groot.SnapshotCache; import com.alibaba.graphscope.groot.common.RoleType; import com.alibaba.graphscope.groot.common.config.CommonConfig; import com.alibaba.graphscope.groot.common.config.Configs; @@ -27,6 +26,7 @@ import com.alibaba.graphscope.groot.coordinator.backup.StoreBackupClient; import com.alibaba.graphscope.groot.coordinator.backup.StoreBackupTaskSender; import com.alibaba.graphscope.groot.discovery.*; +import com.alibaba.graphscope.groot.frontend.SnapshotCache; import com.alibaba.graphscope.groot.meta.DefaultMetaService; import com.alibaba.graphscope.groot.meta.FileMetaStore; import com.alibaba.graphscope.groot.meta.MetaService; @@ -92,16 +92,20 @@ public Coordinator(Configs configs) { new RoleClients<>(this.channelManager, RoleType.FRONTEND, IngestorWriteClient::new); DdlWriter ddlWriter = new DdlWriter(ingestorWriteClients); this.metaService = new DefaultMetaService(configs); + int storeCount = CommonConfig.STORE_NODE_COUNT.get(configs); + int frontendCount = CommonConfig.FRONTEND_NODE_COUNT.get(configs); RoleClients storeSchemaClients = new RoleClients<>(this.channelManager, RoleType.STORE, StoreSchemaClient::new); - GraphDefFetcher graphDefFetcher = new GraphDefFetcher(storeSchemaClients); + GraphDefFetcher graphDefFetcher = new GraphDefFetcher(storeSchemaClients, storeCount); this.schemaManager = new SchemaManager( + configs, this.snapshotManager, ddlExecutors, ddlWriter, this.metaService, - graphDefFetcher); + graphDefFetcher, + frontendSnapshotClients); this.snapshotNotifier = new SnapshotNotifier( this.discovery, diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java index 9228b79e65bd..be77141b1fac 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java @@ -14,7 +14,6 @@ package com.alibaba.graphscope.groot.servers; import com.alibaba.graphscope.groot.CuratorUtils; -import com.alibaba.graphscope.groot.SnapshotCache; import com.alibaba.graphscope.groot.common.RoleType; import com.alibaba.graphscope.groot.common.config.CommonConfig; import com.alibaba.graphscope.groot.common.config.Configs; @@ -26,6 +25,7 @@ import com.alibaba.graphscope.groot.discovery.NodeDiscovery; import com.alibaba.graphscope.groot.discovery.ZkDiscovery; import com.alibaba.graphscope.groot.frontend.*; +import com.alibaba.graphscope.groot.frontend.SnapshotCache; import com.alibaba.graphscope.groot.frontend.write.DefaultEdgeIdGenerator; import com.alibaba.graphscope.groot.frontend.write.EdgeIdGenerator; import com.alibaba.graphscope.groot.frontend.write.GraphWriter; diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/GrootIrMetaReader.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/GrootIrMetaReader.java index a1a7d5f3ded7..f2cfa8ad2c85 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/GrootIrMetaReader.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/GrootIrMetaReader.java @@ -26,8 +26,6 @@ import com.alibaba.graphscope.groot.common.schema.api.SchemaFetcher; import com.google.common.base.Preconditions; -import org.apache.commons.lang3.NotImplementedException; - import java.io.IOException; import java.util.*; @@ -50,7 +48,6 @@ public IrMeta readMeta() throws IOException { @Override public GraphStatistics readStats(GraphId graphId) throws IOException { - // TODO: add statistics, otherwise, the CBO will not work - throw new NotImplementedException("reading graph statistics in groot is unimplemented yet"); + return schemaFetcher.getStatistics(); } } diff --git a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/coordinator/BackupManagerTest.java b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/coordinator/BackupManagerTest.java index 8dee363fae5b..47f7bb8cba96 100644 --- a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/coordinator/BackupManagerTest.java +++ b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/coordinator/BackupManagerTest.java @@ -20,7 +20,6 @@ import static org.mockito.Mockito.*; import com.alibaba.graphscope.groot.CompletionCallback; -import com.alibaba.graphscope.groot.SnapshotCache; import com.alibaba.graphscope.groot.SnapshotWithSchema; import com.alibaba.graphscope.groot.common.config.BackupConfig; import com.alibaba.graphscope.groot.common.config.CommonConfig; @@ -30,6 +29,7 @@ import com.alibaba.graphscope.groot.coordinator.*; import com.alibaba.graphscope.groot.coordinator.backup.BackupManager; import com.alibaba.graphscope.groot.coordinator.backup.StoreBackupTaskSender; +import com.alibaba.graphscope.groot.frontend.SnapshotCache; import com.alibaba.graphscope.groot.meta.MetaService; import com.alibaba.graphscope.groot.meta.MetaStore; import com.alibaba.graphscope.groot.store.backup.StoreBackupId; diff --git a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/coordinator/CoordinatorRpcTest.java b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/coordinator/CoordinatorRpcTest.java index d6c04282c5c6..5ce6946ada60 100644 --- a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/coordinator/CoordinatorRpcTest.java +++ b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/coordinator/CoordinatorRpcTest.java @@ -44,7 +44,7 @@ void testGraphDefFetcher() { StoreSchemaClient client = mock(StoreSchemaClient.class); when(clients.getClient(0)).thenReturn(client); - GraphDefFetcher graphDefFetcher = new GraphDefFetcher(clients); + GraphDefFetcher graphDefFetcher = new GraphDefFetcher(clients, 1); graphDefFetcher.fetchGraphDef(); verify(client).fetchSchema(); } diff --git a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/coordinator/LocalSnapshotListenerTest.java b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/coordinator/LocalSnapshotListenerTest.java index 5578a396c942..90ab805c55bd 100644 --- a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/coordinator/LocalSnapshotListenerTest.java +++ b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/coordinator/LocalSnapshotListenerTest.java @@ -15,10 +15,10 @@ import static org.mockito.Mockito.*; -import com.alibaba.graphscope.groot.SnapshotCache; import com.alibaba.graphscope.groot.common.schema.wrapper.GraphDef; import com.alibaba.graphscope.groot.coordinator.SchemaManager; import com.alibaba.graphscope.groot.coordinator.backup.LocalSnapshotListener; +import com.alibaba.graphscope.groot.frontend.SnapshotCache; import org.junit.jupiter.api.Test; diff --git a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/coordinator/SchemaManagerTest.java b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/coordinator/SchemaManagerTest.java index 698b1a453a33..0a656ab110cf 100644 --- a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/coordinator/SchemaManagerTest.java +++ b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/coordinator/SchemaManagerTest.java @@ -21,18 +21,17 @@ import com.alibaba.graphscope.groot.CompletionCallback; import com.alibaba.graphscope.groot.SnapshotListener; +import com.alibaba.graphscope.groot.common.config.Configs; import com.alibaba.graphscope.groot.common.schema.wrapper.DataType; import com.alibaba.graphscope.groot.common.schema.wrapper.GraphDef; import com.alibaba.graphscope.groot.common.schema.wrapper.PropertyDef; import com.alibaba.graphscope.groot.common.schema.wrapper.PropertyValue; import com.alibaba.graphscope.groot.common.schema.wrapper.TypeDef; import com.alibaba.graphscope.groot.common.schema.wrapper.TypeEnum; -import com.alibaba.graphscope.groot.coordinator.DdlWriter; -import com.alibaba.graphscope.groot.coordinator.GraphDefFetcher; -import com.alibaba.graphscope.groot.coordinator.SchemaManager; -import com.alibaba.graphscope.groot.coordinator.SnapshotManager; +import com.alibaba.graphscope.groot.coordinator.*; import com.alibaba.graphscope.groot.meta.MetaService; import com.alibaba.graphscope.groot.operation.BatchId; +import com.alibaba.graphscope.groot.rpc.RoleClients; import com.alibaba.graphscope.groot.schema.ddl.DdlExecutors; import com.alibaba.graphscope.groot.schema.request.CreateVertexTypeRequest; import com.alibaba.graphscope.groot.schema.request.DdlRequestBatch; @@ -69,13 +68,17 @@ void testSchemaManager() throws IOException, InterruptedException { GraphDef initialGraphDef = GraphDef.newBuilder().build(); when(mockGraphDefFetcher.fetchGraphDef()).thenReturn(initialGraphDef); + RoleClients frontendSnapshotClients = mock(RoleClients.class); + Configs configs = Configs.newBuilder().put("frontend.node.count", "1").build(); SchemaManager schemaManager = new SchemaManager( + configs, mockSnapshotManager, ddlExecutors, mockDdlWriter, mockMetaService, - mockGraphDefFetcher); + mockGraphDefFetcher, + frontendSnapshotClients); schemaManager.start(); assertEquals(initialGraphDef, schemaManager.getGraphDef()); diff --git a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/frontend/FrontendRpcTest.java b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/frontend/FrontendRpcTest.java index 016f825e1283..9df3098ffe46 100644 --- a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/frontend/FrontendRpcTest.java +++ b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/frontend/FrontendRpcTest.java @@ -17,12 +17,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.*; -import com.alibaba.graphscope.groot.SnapshotCache; import com.alibaba.graphscope.groot.SnapshotWithSchema; import com.alibaba.graphscope.groot.common.schema.wrapper.GraphDef; import com.alibaba.graphscope.groot.common.util.BackupInfo; import com.alibaba.graphscope.groot.frontend.*; import com.alibaba.graphscope.groot.frontend.FrontendSnapshotService; +import com.alibaba.graphscope.groot.frontend.SnapshotCache; import com.alibaba.graphscope.groot.rpc.RoleClients; import com.alibaba.graphscope.groot.schema.request.DdlRequestBatch; import com.alibaba.graphscope.proto.groot.*; diff --git a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/frontend/SnapshotCacheTest.java b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/frontend/SnapshotCacheTest.java index 1f64dd2561f9..ccdb87abb7a6 100644 --- a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/frontend/SnapshotCacheTest.java +++ b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/frontend/SnapshotCacheTest.java @@ -16,9 +16,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.*; -import com.alibaba.graphscope.groot.SnapshotCache; import com.alibaba.graphscope.groot.SnapshotListener; import com.alibaba.graphscope.groot.common.schema.wrapper.GraphDef; +import com.alibaba.graphscope.groot.frontend.SnapshotCache; import org.junit.jupiter.api.Test; diff --git a/proto/groot/frontend_snapshot_service.proto b/proto/groot/frontend_snapshot_service.proto index 181c7a443e6a..2c8d5efebe47 100644 --- a/proto/groot/frontend_snapshot_service.proto +++ b/proto/groot/frontend_snapshot_service.proto @@ -23,6 +23,7 @@ option java_multiple_files = true; service FrontendSnapshot { rpc advanceQuerySnapshot(AdvanceQuerySnapshotRequest) returns (AdvanceQuerySnapshotResponse); + rpc syncStatistics(SyncStatisticsRequest) returns (SyncStatisticsResponse); } message AdvanceQuerySnapshotRequest { @@ -33,3 +34,11 @@ message AdvanceQuerySnapshotRequest { message AdvanceQuerySnapshotResponse { int64 previousSnapshotId = 1; } + +message SyncStatisticsRequest { + Statistics statistics = 1; +} + +message SyncStatisticsResponse { + +} \ No newline at end of file diff --git a/proto/groot/sdk/model.proto b/proto/groot/sdk/model.proto index aea7eba8d32a..7cb670496aea 100644 --- a/proto/groot/sdk/model.proto +++ b/proto/groot/sdk/model.proto @@ -155,6 +155,23 @@ message StorePropertyListPb { repeated StorePropertyPb properties = 1; } + +message Statistics { + message VertexTypeStatistics { + gs.rpc.graph.LabelIdPb labelId = 1; + uint64 numVertices = 2; + } + message EdgeTypeStatistics { + gs.rpc.graph.EdgeKindPb edgeKind = 1; + uint64 numEdges = 2; + } + int64 snapshotId = 1; + uint64 numVertices = 2; + uint64 numEdges = 3; + repeated VertexTypeStatistics vertexTypeStatistics = 4; + repeated EdgeTypeStatistics edgeTypeStatistics = 5; +} + message ConfigPb { map configs = 1; } diff --git a/proto/groot/store_schema_service.proto b/proto/groot/store_schema_service.proto index 645f54556cb6..5fb0433b8de8 100644 --- a/proto/groot/store_schema_service.proto +++ b/proto/groot/store_schema_service.proto @@ -23,6 +23,7 @@ option java_multiple_files = true; service StoreSchema { rpc fetchSchema(FetchSchemaRequest) returns (FetchSchemaResponse); + rpc fetchStatistics(FetchStatisticsRequest) returns (FetchStatisticsResponse); } message FetchSchemaRequest { @@ -31,3 +32,12 @@ message FetchSchemaRequest { message FetchSchemaResponse { GraphDefPb graphDef = 1; } + +message FetchStatisticsRequest { + int64 snapshotId = 1; +} + +message FetchStatisticsResponse { + map statistics_map = 1; +} +