From 7f27fa3fef3903369f25177ab2c346efcc20e03f Mon Sep 17 00:00:00 2001 From: Song Tingyu <53935948+SighingSnow@users.noreply.github.com> Date: Mon, 18 Dec 2023 19:10:52 +0800 Subject: [PATCH] fix(analytical): Support extend label data for graph in eager mode. (#3288) Fixes #411 Signed-off-by: SighingSnow <1263750383@qq.com> Co-authored-by: Siyuan Zhang --- analytical_engine/core/io/property_parser.h | 9 +++ .../core/loader/arrow_fragment_loader.h | 69 +++++++++++++++++++ .../frame/property_graph_frame.cc | 11 ++- proto/types.proto | 3 + python/graphscope/framework/dag_utils.py | 1 + python/graphscope/framework/graph.py | 23 +++++-- .../tests/unittest/test_create_graph.py | 10 --- .../graphscope/tests/unittest/test_graph.py | 55 ++++++++++++--- 8 files changed, 155 insertions(+), 26 deletions(-) diff --git a/analytical_engine/core/io/property_parser.h b/analytical_engine/core/io/property_parser.h index acf3e1da25ca..d8ae06647b1c 100644 --- a/analytical_engine/core/io/property_parser.h +++ b/analytical_engine/core/io/property_parser.h @@ -133,6 +133,13 @@ struct Graph { bool retain_oid = true; bool compact_edges = false; bool use_perfect_hash = false; + // This is used to extend the label data + // when user try to add data to existed labels. + // the available option is 0/1/2, + // 0 stands for no extend, + // 1 stands for extend vertex label data, + // 2 stands for extend edge label data. + int extend_type = 0; std::string SerializeToString() const { std::stringstream ss; @@ -289,6 +296,7 @@ inline bl::result> ParseCreatePropertyGraph( BOOST_LEAF_AUTO(compact_edges, params.Get(rpc::COMPACT_EDGES, false)); BOOST_LEAF_AUTO(use_perfect_hash, params.Get(rpc::USE_PERFECT_HASH, false)); + BOOST_LEAF_AUTO(extend_type, params.Get(rpc::EXTEND_LABEL_DATA, 0)); auto graph = std::make_shared(); graph->directed = directed; @@ -296,6 +304,7 @@ inline bl::result> ParseCreatePropertyGraph( graph->retain_oid = retain_oid; graph->compact_edges = compact_edges; graph->use_perfect_hash = use_perfect_hash; + graph->extend_type = extend_type; const auto& large_attr = params.GetLargeAttr(); for (const auto& item : large_attr.chunk_list().items()) { diff --git a/analytical_engine/core/loader/arrow_fragment_loader.h b/analytical_engine/core/loader/arrow_fragment_loader.h index 1a8c369f8755..6db66e16535a 100644 --- a/analytical_engine/core/loader/arrow_fragment_loader.h +++ b/analytical_engine/core/loader/arrow_fragment_loader.h @@ -272,6 +272,22 @@ class ArrowFragmentLoader : public vineyard::ArrowFragmentLoader { return Base::addVerticesAndEdges(frag_id, std::move(raw_v_e_tables)); } + bl::result AddDataToExistedVLable( + vineyard::ObjectID frag_id, label_id_t label_id) { + BOOST_LEAF_CHECK(initPartitioner()); + BOOST_LEAF_AUTO(raw_v_e_tables, LoadVertexEdgeTables()); + return Base::addDataToExistedVLabel(frag_id, label_id, + std::move(raw_v_e_tables)); + } + + bl::result AddDataToExistedELable( + vineyard::ObjectID frag_id, label_id_t label_id) { + BOOST_LEAF_CHECK(initPartitioner()); + BOOST_LEAF_AUTO(raw_v_e_tables, LoadVertexEdgeTables()); + return Base::addDataToExistedELabel(frag_id, label_id, + std::move(raw_v_e_tables)); + } + boost::leaf::result AddLabelsToFragmentAsFragmentGroup( vineyard::ObjectID frag_id) { BOOST_LEAF_AUTO(new_frag_id, AddLabelsToFragment(frag_id)); @@ -279,6 +295,59 @@ class ArrowFragmentLoader : public vineyard::ArrowFragmentLoader { return vineyard::ConstructFragmentGroup(client_, new_frag_id, comm_spec_); } + bl::result ExtendLabelData(vineyard::ObjectID frag_id, + int extend_type) { + // find duplicate label id + assert(extend_type); + auto frag = std::dynamic_pointer_cast( + client_.GetObject(frag_id)); + vineyard::PropertyGraphSchema schema = frag->schema(); + std::vector labels; + label_id_t target_label_id = -1; + if (extend_type == 1) + labels = schema.GetVertexLabels(); + else if (extend_type == 2) + labels = schema.GetEdgeLabels(); + + std::map label_set; + for (size_t i = 0; i < labels.size(); ++i) + label_set[labels[i]] = i; + + if (extend_type == 1) { + for (size_t i = 0; i < graph_info_->vertices.size(); ++i) { + auto it = label_set.find(graph_info_->vertices[i]->label); + if (it != label_set.end()) { + target_label_id = it->second; + break; + } + } + } else if (extend_type == 2) { + for (size_t i = 0; i < graph_info_->edges.size(); ++i) { + auto it = label_set.find(graph_info_->edges[i]->label); + if (it != label_set.end()) { + target_label_id = it->second; + break; + } + } + } else { + RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError, + "extend type is invalid"); + } + + if (target_label_id == -1) + RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError, + "label not found"); + vineyard::ObjectID new_frag_id; + if (extend_type == 1) { + BOOST_LEAF_ASSIGN(new_frag_id, + AddDataToExistedVLable(frag_id, target_label_id)); + } else if (extend_type == 2) { + BOOST_LEAF_ASSIGN(new_frag_id, + AddDataToExistedELable(frag_id, target_label_id)); + } + return vineyard::ConstructFragmentGroup(client_, new_frag_id, comm_spec_); + } + bl::result initPartitioner() { #ifdef HASH_PARTITION Base::partitioner_.Init(comm_spec_.fnum()); diff --git a/analytical_engine/frame/property_graph_frame.cc b/analytical_engine/frame/property_graph_frame.cc index 6ca408c963df..0b840dad4bf9 100644 --- a/analytical_engine/frame/property_graph_frame.cc +++ b/analytical_engine/frame/property_graph_frame.cc @@ -346,9 +346,16 @@ AddLabelsToGraph(vineyard::ObjectID origin_frag_id, BOOST_LEAF_AUTO(graph_info, gs::ParseCreatePropertyGraph(params)); using loader_t = gs::arrow_fragment_loader_t; loader_t loader(client, comm_spec, graph_info); + vineyard::ObjectID frag_group_id = vineyard::InvalidObjectID(); - BOOST_LEAF_AUTO(frag_group_id, - loader.AddLabelsToFragmentAsFragmentGroup(origin_frag_id)); + if (graph_info->extend_type) { + BOOST_LEAF_ASSIGN( + frag_group_id, + loader.ExtendLabelData(origin_frag_id, graph_info->extend_type)); + } else { + BOOST_LEAF_ASSIGN(frag_group_id, loader.AddLabelsToFragmentAsFragmentGroup( + origin_frag_id)); + } MPI_Barrier(comm_spec.comm()); LOG_IF(INFO, comm_spec.worker_id() == 0) diff --git a/proto/types.proto b/proto/types.proto index 82d9f69e1811..2155e7e63d02 100644 --- a/proto/types.proto +++ b/proto/types.proto @@ -201,6 +201,9 @@ enum ParamKey { IS_FROM_GAR = 70; GRAPH_INFO_PATH = 71; + // Extend label data + EXTEND_LABEL_DATA = 80; + APP_NAME = 100; APP_ALGO = 101; APP_LIBRARY_PATH = 102; diff --git a/python/graphscope/framework/dag_utils.py b/python/graphscope/framework/dag_utils.py index f8fe467a516e..2e977f0bef10 100644 --- a/python/graphscope/framework/dag_utils.py +++ b/python/graphscope/framework/dag_utils.py @@ -202,6 +202,7 @@ def add_labels_to_graph(graph, loader_op): config = { types_pb2.GRAPH_TYPE: utils.graph_type_to_attr(graph._graph_type), types_pb2.DIRECTED: utils.b_to_attr(graph._directed), + types_pb2.EXTEND_LABEL_DATA: utils.i_to_attr(graph._extend_label_data), types_pb2.OID_TYPE: utils.s_to_attr(graph._oid_type), types_pb2.VID_TYPE: utils.s_to_attr(graph._vid_type), types_pb2.GENERATE_EID: utils.b_to_attr(graph._generate_eid), diff --git a/python/graphscope/framework/graph.py b/python/graphscope/framework/graph.py index 281ca3ddf54d..883c2bcf731e 100644 --- a/python/graphscope/framework/graph.py +++ b/python/graphscope/framework/graph.py @@ -65,6 +65,7 @@ def __init__(self): self._vertex_map = graph_def_pb2.GLOBAL_VERTEX_MAP self._compact_edges = False self._use_perfect_hash = False + self._extend_label_data = 0 @property def session_id(self): @@ -215,6 +216,7 @@ def _construct_op_of_empty_graph(self): config[types_pb2.VERTEX_MAP_TYPE] = utils.i_to_attr(self._vertex_map) config[types_pb2.COMPACT_EDGES] = utils.b_to_attr(self._compact_edges) config[types_pb2.USE_PERFECT_HASH] = utils.b_to_attr(self._use_perfect_hash) + config[types_pb2.EXTEND_LABEL_DATA] = utils.i_to_attr(self._extend_label_data) return dag_utils.create_graph( self.session_id, graph_def_pb2.ARROW_PROPERTY, inputs=None, attrs=config ) @@ -304,6 +306,11 @@ def __init__( self._vertex_map = utils.vertex_map_type_to_enum(vertex_map) self._compact_edges = compact_edges self._use_perfect_hash = use_perfect_hash + # for need to extend label in 'eager mode' when add_vertices and add_edges + # 0 - not extending label + # 1 - extend vertex label + # 2 - extend edge label + self._extend_label_data = 0 # list of pair self._unsealed_vertices_and_edges = list() @@ -505,10 +512,15 @@ def add_vertices( "Cannot incrementally add vertices to graphs with compacted edges, " "please use `graphscope.load_from()` instead." ) - if label in self._v_labels: - raise ValueError(f"Label {label} already existed in graph.") if not self._v_labels and self._e_labels: raise ValueError("Cannot manually add vertices after inferred vertices.") + # currently not support local_vertex_map + if label in self._v_labels: + self._extend_label_data = 1 + warnings.warn( + f"Label {label} already existed in graph" + ", origin label data will be extend." + ) unsealed_vertices_and_edges = deepcopy(self._unsealed_vertices_and_edges) vertex_label = VertexLabel( label=label, @@ -520,7 +532,8 @@ def add_vertices( ) unsealed_vertices_and_edges.append((self.op.key, vertex_label)) v_labels = deepcopy(self._v_labels) - v_labels.append(label) + if self._extend_label_data == 0: + v_labels.append(label) # generate and add a loader op to dag loader_op = dag_utils.create_loader(vertex_label) self._session.dag.add_op(loader_op) @@ -616,7 +629,7 @@ def add_edges( if self.evaluated: if label in self._e_labels: - raise ValueError(f"Label {label} already existed in graph") + self._extend_label_data = 2 unsealed_vertices = list() unsealed_edges = list() @@ -634,7 +647,7 @@ def add_edges( v_labels.append(dst_label) parent = self - if label in self.e_labels: + if not self.evaluated and label in self.e_labels: # aggregate op with the same edge label fork = False unsealed_vertices_and_edges = list() diff --git a/python/graphscope/tests/unittest/test_create_graph.py b/python/graphscope/tests/unittest/test_create_graph.py index 30eb585ca36d..d5815b776bad 100644 --- a/python/graphscope/tests/unittest/test_create_graph.py +++ b/python/graphscope/tests/unittest/test_create_graph.py @@ -426,16 +426,6 @@ def test_error_on_ambigious_default_label( graph = graph.add_edges(student_group_e, "group") -def test_error_on_duplicate_labels(graphscope_session, student_group_e, student_v): - graph = graphscope_session.g() - graph = graph.add_vertices(student_v, "student") - with pytest.raises(ValueError, match="Label student already existed in graph"): - graph = graph.add_vertices(student_v, "student") - graph = graph.add_edges(student_group_e, "group") - with pytest.raises(ValueError, match="already existed in graph"): - graph = graph.add_edges(student_group_e, "group") - - def test_load_complex_graph( graphscope_session, score_e, diff --git a/python/graphscope/tests/unittest/test_graph.py b/python/graphscope/tests/unittest/test_graph.py index 82b9dba64b0b..92460a7172f2 100644 --- a/python/graphscope/tests/unittest/test_graph.py +++ b/python/graphscope/tests/unittest/test_graph.py @@ -20,6 +20,7 @@ import os import numpy as np +import pandas as pd import pytest import graphscope @@ -278,15 +279,6 @@ def test_add_vertices_edges(graphscope_session): graph = graph.add_vertices( Loader(f"{prefix}/software.csv", delimiter="|"), "software" ) - - with pytest.raises(ValueError, match="already existed in graph"): - graph = graph.add_edges( - Loader(f"{prefix}/knows.csv", delimiter="|"), - "knows", - src_label="software", - dst_label="software", - ) - graph = graph.add_edges( Loader(f"{prefix}/created.csv", delimiter="|"), "created", @@ -298,6 +290,51 @@ def test_add_vertices_edges(graphscope_session): assert graph.schema.edge_labels == ["knows", "created"] +def test_extend_vertices_edges(graphscope_session): + prefix = os.path.expandvars("${GS_TEST_DIR}/") + verts = pd.read_csv(f"{prefix}/p2p_v.csv") + edges = pd.read_csv(f"{prefix}/p2p_e.csv") + test_list = ["v11308", "v50089", "v60129"] + + g1 = graphscope_session.g(oid_type="std::string") + g1 = g1.add_vertices(Loader(verts), "person") + g1 = g1.add_edges(Loader(edges), "knows", src_label="person", dst_label="person") + + g2 = graphscope_session.g(oid_type="std::string") + g2 = g2.add_vertices(Loader(verts[:12980]), "person") + g2 = g2.add_vertices(Loader(verts[12980:31530]), "person") + g2 = g2.add_vertices(Loader(verts[31530:]), "person") + + g2 = g2.add_edges( + Loader(edges[:2302]), "knows", src_label="person", dst_label="person" + ) + g2 = g2.add_edges( + Loader(edges[2302:40021]), "knows", src_label="person", dst_label="person" + ) + g2 = g2.add_edges( + Loader(edges[40021:]), "knows", src_label="person", dst_label="person" + ) + + sg1 = g1.project(vertices={"person": ["id"]}, edges={"knows": ["dist"]}) + sg2 = g2.project(vertices={"person": ["id"]}, edges={"knows": ["dist"]}) + for src in test_list: + res1 = graphscope.sssp(sg1, src=src, weight="dist") + res2 = graphscope.sssp(sg2, src=src, weight="dist") + df1 = res1.to_dataframe(selector={"id": "v.id", "r": "r"}).sort_values( + by=["id"], ignore_index=True + ) + df2 = res2.to_dataframe(selector={"id": "v.id", "r": "r"}).sort_values( + by=["id"], ignore_index=True + ) + if not df1.equals(df2): + pytest.raises( + AssertionError, "different sssp result got after extending graph data" + ) + + del g1, g2, sg1, sg2 + print("pass graph extending test") + + def test_complicated_add_edges(graphscope_session): prefix = os.path.expandvars("${GS_TEST_DIR}/modern_graph") graph = graphscope_session.g()