Skip to content

Commit

Permalink
fix(analytical): Support extend label data for graph in eager mode. (#…
Browse files Browse the repository at this point in the history
…3288)

Fixes #411

Signed-off-by: SighingSnow <[email protected]>
Co-authored-by: Siyuan Zhang <[email protected]>
  • Loading branch information
SighingSnow and siyuan0322 authored Dec 18, 2023
1 parent c28caca commit 7f27fa3
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 26 deletions.
9 changes: 9 additions & 0 deletions analytical_engine/core/io/property_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ struct Graph {
bool retain_oid = true;
bool compact_edges = false;
bool use_perfect_hash = false;
// This is used to extend the label data
// when user try to add data to existed labels.
// the available option is 0/1/2,
// 0 stands for no extend,
// 1 stands for extend vertex label data,
// 2 stands for extend edge label data.
int extend_type = 0;

std::string SerializeToString() const {
std::stringstream ss;
Expand Down Expand Up @@ -289,13 +296,15 @@ inline bl::result<std::shared_ptr<detail::Graph>> ParseCreatePropertyGraph(
BOOST_LEAF_AUTO(compact_edges, params.Get<bool>(rpc::COMPACT_EDGES, false));
BOOST_LEAF_AUTO(use_perfect_hash,
params.Get<bool>(rpc::USE_PERFECT_HASH, false));
BOOST_LEAF_AUTO(extend_type, params.Get<int64_t>(rpc::EXTEND_LABEL_DATA, 0));

auto graph = std::make_shared<detail::Graph>();
graph->directed = directed;
graph->generate_eid = generate_eid;
graph->retain_oid = retain_oid;
graph->compact_edges = compact_edges;
graph->use_perfect_hash = use_perfect_hash;
graph->extend_type = extend_type;

const auto& large_attr = params.GetLargeAttr();
for (const auto& item : large_attr.chunk_list().items()) {
Expand Down
69 changes: 69 additions & 0 deletions analytical_engine/core/loader/arrow_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,13 +272,82 @@ class ArrowFragmentLoader : public vineyard::ArrowFragmentLoader<OID_T, VID_T> {
return Base::addVerticesAndEdges(frag_id, std::move(raw_v_e_tables));
}

bl::result<vineyard::ObjectID> AddDataToExistedVLable(
vineyard::ObjectID frag_id, label_id_t label_id) {
BOOST_LEAF_CHECK(initPartitioner());
BOOST_LEAF_AUTO(raw_v_e_tables, LoadVertexEdgeTables());
return Base::addDataToExistedVLabel(frag_id, label_id,
std::move(raw_v_e_tables));
}

bl::result<vineyard::ObjectID> AddDataToExistedELable(
vineyard::ObjectID frag_id, label_id_t label_id) {
BOOST_LEAF_CHECK(initPartitioner());
BOOST_LEAF_AUTO(raw_v_e_tables, LoadVertexEdgeTables());
return Base::addDataToExistedELabel(frag_id, label_id,
std::move(raw_v_e_tables));
}

boost::leaf::result<vineyard::ObjectID> AddLabelsToFragmentAsFragmentGroup(
vineyard::ObjectID frag_id) {
BOOST_LEAF_AUTO(new_frag_id, AddLabelsToFragment(frag_id));
VY_OK_OR_RAISE(client_.Persist(new_frag_id));
return vineyard::ConstructFragmentGroup(client_, new_frag_id, comm_spec_);
}

bl::result<vineyard::ObjectID> ExtendLabelData(vineyard::ObjectID frag_id,
int extend_type) {
// find duplicate label id
assert(extend_type);
auto frag = std::dynamic_pointer_cast<vineyard::ArrowFragmentBase>(
client_.GetObject(frag_id));
vineyard::PropertyGraphSchema schema = frag->schema();
std::vector<std::string> labels;
label_id_t target_label_id = -1;
if (extend_type == 1)
labels = schema.GetVertexLabels();
else if (extend_type == 2)
labels = schema.GetEdgeLabels();

std::map<std::string, label_id_t> label_set;
for (size_t i = 0; i < labels.size(); ++i)
label_set[labels[i]] = i;

if (extend_type == 1) {
for (size_t i = 0; i < graph_info_->vertices.size(); ++i) {
auto it = label_set.find(graph_info_->vertices[i]->label);
if (it != label_set.end()) {
target_label_id = it->second;
break;
}
}
} else if (extend_type == 2) {
for (size_t i = 0; i < graph_info_->edges.size(); ++i) {
auto it = label_set.find(graph_info_->edges[i]->label);
if (it != label_set.end()) {
target_label_id = it->second;
break;
}
}
} else {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"extend type is invalid");
}

if (target_label_id == -1)
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"label not found");
vineyard::ObjectID new_frag_id;
if (extend_type == 1) {
BOOST_LEAF_ASSIGN(new_frag_id,
AddDataToExistedVLable(frag_id, target_label_id));
} else if (extend_type == 2) {
BOOST_LEAF_ASSIGN(new_frag_id,
AddDataToExistedELable(frag_id, target_label_id));
}
return vineyard::ConstructFragmentGroup(client_, new_frag_id, comm_spec_);
}

bl::result<void> initPartitioner() {
#ifdef HASH_PARTITION
Base::partitioner_.Init(comm_spec_.fnum());
Expand Down
11 changes: 9 additions & 2 deletions analytical_engine/frame/property_graph_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,16 @@ AddLabelsToGraph(vineyard::ObjectID origin_frag_id,
BOOST_LEAF_AUTO(graph_info, gs::ParseCreatePropertyGraph(params));
using loader_t = gs::arrow_fragment_loader_t<oid_t, vid_t, vertex_map_t>;
loader_t loader(client, comm_spec, graph_info);
vineyard::ObjectID frag_group_id = vineyard::InvalidObjectID();

BOOST_LEAF_AUTO(frag_group_id,
loader.AddLabelsToFragmentAsFragmentGroup(origin_frag_id));
if (graph_info->extend_type) {
BOOST_LEAF_ASSIGN(
frag_group_id,
loader.ExtendLabelData(origin_frag_id, graph_info->extend_type));
} else {
BOOST_LEAF_ASSIGN(frag_group_id, loader.AddLabelsToFragmentAsFragmentGroup(
origin_frag_id));
}
MPI_Barrier(comm_spec.comm());

LOG_IF(INFO, comm_spec.worker_id() == 0)
Expand Down
3 changes: 3 additions & 0 deletions proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ enum ParamKey {
IS_FROM_GAR = 70;
GRAPH_INFO_PATH = 71;

// Extend label data
EXTEND_LABEL_DATA = 80;

APP_NAME = 100;
APP_ALGO = 101;
APP_LIBRARY_PATH = 102;
Expand Down
1 change: 1 addition & 0 deletions python/graphscope/framework/dag_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def add_labels_to_graph(graph, loader_op):
config = {
types_pb2.GRAPH_TYPE: utils.graph_type_to_attr(graph._graph_type),
types_pb2.DIRECTED: utils.b_to_attr(graph._directed),
types_pb2.EXTEND_LABEL_DATA: utils.i_to_attr(graph._extend_label_data),
types_pb2.OID_TYPE: utils.s_to_attr(graph._oid_type),
types_pb2.VID_TYPE: utils.s_to_attr(graph._vid_type),
types_pb2.GENERATE_EID: utils.b_to_attr(graph._generate_eid),
Expand Down
23 changes: 18 additions & 5 deletions python/graphscope/framework/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def __init__(self):
self._vertex_map = graph_def_pb2.GLOBAL_VERTEX_MAP
self._compact_edges = False
self._use_perfect_hash = False
self._extend_label_data = 0

@property
def session_id(self):
Expand Down Expand Up @@ -215,6 +216,7 @@ def _construct_op_of_empty_graph(self):
config[types_pb2.VERTEX_MAP_TYPE] = utils.i_to_attr(self._vertex_map)
config[types_pb2.COMPACT_EDGES] = utils.b_to_attr(self._compact_edges)
config[types_pb2.USE_PERFECT_HASH] = utils.b_to_attr(self._use_perfect_hash)
config[types_pb2.EXTEND_LABEL_DATA] = utils.i_to_attr(self._extend_label_data)
return dag_utils.create_graph(
self.session_id, graph_def_pb2.ARROW_PROPERTY, inputs=None, attrs=config
)
Expand Down Expand Up @@ -304,6 +306,11 @@ def __init__(
self._vertex_map = utils.vertex_map_type_to_enum(vertex_map)
self._compact_edges = compact_edges
self._use_perfect_hash = use_perfect_hash
# for need to extend label in 'eager mode' when add_vertices and add_edges
# 0 - not extending label
# 1 - extend vertex label
# 2 - extend edge label
self._extend_label_data = 0

# list of pair <parent_op_key, VertexLabel/EdgeLabel>
self._unsealed_vertices_and_edges = list()
Expand Down Expand Up @@ -505,10 +512,15 @@ def add_vertices(
"Cannot incrementally add vertices to graphs with compacted edges, "
"please use `graphscope.load_from()` instead."
)
if label in self._v_labels:
raise ValueError(f"Label {label} already existed in graph.")
if not self._v_labels and self._e_labels:
raise ValueError("Cannot manually add vertices after inferred vertices.")
# currently not support local_vertex_map
if label in self._v_labels:
self._extend_label_data = 1
warnings.warn(
f"Label {label} already existed in graph"
", origin label data will be extend."
)
unsealed_vertices_and_edges = deepcopy(self._unsealed_vertices_and_edges)
vertex_label = VertexLabel(
label=label,
Expand All @@ -520,7 +532,8 @@ def add_vertices(
)
unsealed_vertices_and_edges.append((self.op.key, vertex_label))
v_labels = deepcopy(self._v_labels)
v_labels.append(label)
if self._extend_label_data == 0:
v_labels.append(label)
# generate and add a loader op to dag
loader_op = dag_utils.create_loader(vertex_label)
self._session.dag.add_op(loader_op)
Expand Down Expand Up @@ -616,7 +629,7 @@ def add_edges(

if self.evaluated:
if label in self._e_labels:
raise ValueError(f"Label {label} already existed in graph")
self._extend_label_data = 2

unsealed_vertices = list()
unsealed_edges = list()
Expand All @@ -634,7 +647,7 @@ def add_edges(
v_labels.append(dst_label)

parent = self
if label in self.e_labels:
if not self.evaluated and label in self.e_labels:
# aggregate op with the same edge label
fork = False
unsealed_vertices_and_edges = list()
Expand Down
10 changes: 0 additions & 10 deletions python/graphscope/tests/unittest/test_create_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,16 +426,6 @@ def test_error_on_ambigious_default_label(
graph = graph.add_edges(student_group_e, "group")


def test_error_on_duplicate_labels(graphscope_session, student_group_e, student_v):
graph = graphscope_session.g()
graph = graph.add_vertices(student_v, "student")
with pytest.raises(ValueError, match="Label student already existed in graph"):
graph = graph.add_vertices(student_v, "student")
graph = graph.add_edges(student_group_e, "group")
with pytest.raises(ValueError, match="already existed in graph"):
graph = graph.add_edges(student_group_e, "group")


def test_load_complex_graph(
graphscope_session,
score_e,
Expand Down
55 changes: 46 additions & 9 deletions python/graphscope/tests/unittest/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import os

import numpy as np
import pandas as pd
import pytest

import graphscope
Expand Down Expand Up @@ -278,15 +279,6 @@ def test_add_vertices_edges(graphscope_session):
graph = graph.add_vertices(
Loader(f"{prefix}/software.csv", delimiter="|"), "software"
)

with pytest.raises(ValueError, match="already existed in graph"):
graph = graph.add_edges(
Loader(f"{prefix}/knows.csv", delimiter="|"),
"knows",
src_label="software",
dst_label="software",
)

graph = graph.add_edges(
Loader(f"{prefix}/created.csv", delimiter="|"),
"created",
Expand All @@ -298,6 +290,51 @@ def test_add_vertices_edges(graphscope_session):
assert graph.schema.edge_labels == ["knows", "created"]


def test_extend_vertices_edges(graphscope_session):
prefix = os.path.expandvars("${GS_TEST_DIR}/")
verts = pd.read_csv(f"{prefix}/p2p_v.csv")
edges = pd.read_csv(f"{prefix}/p2p_e.csv")
test_list = ["v11308", "v50089", "v60129"]

g1 = graphscope_session.g(oid_type="std::string")
g1 = g1.add_vertices(Loader(verts), "person")
g1 = g1.add_edges(Loader(edges), "knows", src_label="person", dst_label="person")

g2 = graphscope_session.g(oid_type="std::string")
g2 = g2.add_vertices(Loader(verts[:12980]), "person")
g2 = g2.add_vertices(Loader(verts[12980:31530]), "person")
g2 = g2.add_vertices(Loader(verts[31530:]), "person")

g2 = g2.add_edges(
Loader(edges[:2302]), "knows", src_label="person", dst_label="person"
)
g2 = g2.add_edges(
Loader(edges[2302:40021]), "knows", src_label="person", dst_label="person"
)
g2 = g2.add_edges(
Loader(edges[40021:]), "knows", src_label="person", dst_label="person"
)

sg1 = g1.project(vertices={"person": ["id"]}, edges={"knows": ["dist"]})
sg2 = g2.project(vertices={"person": ["id"]}, edges={"knows": ["dist"]})
for src in test_list:
res1 = graphscope.sssp(sg1, src=src, weight="dist")
res2 = graphscope.sssp(sg2, src=src, weight="dist")
df1 = res1.to_dataframe(selector={"id": "v.id", "r": "r"}).sort_values(
by=["id"], ignore_index=True
)
df2 = res2.to_dataframe(selector={"id": "v.id", "r": "r"}).sort_values(
by=["id"], ignore_index=True
)
if not df1.equals(df2):
pytest.raises(
AssertionError, "different sssp result got after extending graph data"
)

del g1, g2, sg1, sg2
print("pass graph extending test")


def test_complicated_add_edges(graphscope_session):
prefix = os.path.expandvars("${GS_TEST_DIR}/modern_graph")
graph = graphscope_session.g()
Expand Down

0 comments on commit 7f27fa3

Please sign in to comment.