Skip to content

Commit

Permalink
fix(interactive): Allow duplicate vertices and edge with invalid vert…
Browse files Browse the repository at this point in the history
…ex id in source data (#3939)

- When there are duplicate vertices, only the first will be retained.

- edge with invalid vertex id will be discard.
  • Loading branch information
liulx20 authored Jun 19, 2024
1 parent ce17b4c commit 10525cf
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,18 @@ void set_vertex_column_from_string_array(
std::static_pointer_cast<arrow::LargeStringArray>(array->chunk(j));
for (auto k = 0; k < casted->length(); ++k) {
auto str = casted->GetView(k);
std::string_view sw(str.data(), str.size());
col->set_any(vids[cur_ind++], std::move(sw));
std::string_view sw;
if (casted->IsNull(k)) {
VLOG(1) << "Found null string in vertex property.";
sw = "";
} else {
sw = std::string_view(str.data(), str.size());
}
if (vids[cur_ind] == std::numeric_limits<vid_t>::max()) {
cur_ind++;
} else {
col->set_any(vids[cur_ind++], std::move(sw));
}
}
}
} else {
Expand All @@ -53,7 +63,11 @@ void set_vertex_column_from_string_array(
for (auto k = 0; k < casted->length(); ++k) {
auto str = casted->GetView(k);
std::string_view sw(str.data(), str.size());
col->set_any(vids[cur_ind++], std::move(sw));
if (vids[cur_ind] == std::numeric_limits<vid_t>::max()) {
cur_ind++;
} else {
col->set_any(vids[cur_ind++], std::move(sw));
}
}
}
}
Expand Down Expand Up @@ -104,8 +118,12 @@ void set_vertex_column_from_timestamp_array(
auto casted =
std::static_pointer_cast<arrow::TimestampArray>(array->chunk(j));
for (auto k = 0; k < casted->length(); ++k) {
col->set_any(vids[cur_ind++],
std::move(AnyConverter<Date>::to_any(casted->Value(k))));
if (vids[cur_ind] == std::numeric_limits<vid_t>::max()) {
cur_ind++;
} else {
col->set_any(vids[cur_ind++],
std::move(AnyConverter<Date>::to_any(casted->Value(k))));
}
}
}
} else {
Expand All @@ -125,8 +143,12 @@ void set_vertex_column_from_timestamp_array_to_day(
auto casted =
std::static_pointer_cast<arrow::TimestampArray>(array->chunk(j));
for (auto k = 0; k < casted->length(); ++k) {
col->set_any(vids[cur_ind++],
std::move(AnyConverter<Day>::to_any(casted->Value(k))));
if (vids[cur_ind] == std::numeric_limits<vid_t>::max()) {
cur_ind++;
} else {
col->set_any(vids[cur_ind++],
std::move(AnyConverter<Day>::to_any(casted->Value(k))));
}
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,11 @@ struct _add_vertex {
auto casted_array = std::static_pointer_cast<arrow_array_t>(col);
for (size_t i = 0; i < row_num; ++i) {
if (!indexer.add(casted_array->Value(i), vid)) {
LOG(FATAL) << "Duplicate vertex id: " << casted_array->Value(i)
<< "..";
VLOG(2) << "Duplicate vertex id: " << casted_array->Value(i) << "..";
vids.emplace_back(std::numeric_limits<vid_t>::max());
} else {
vids.emplace_back(vid);
}
vids.emplace_back(vid);
}
} else {
if (col->type()->Equals(arrow::utf8())) {
Expand All @@ -115,9 +116,11 @@ struct _add_vertex {
auto str = casted_array->GetView(i);
std::string_view str_view(str.data(), str.size());
if (!indexer.add(str_view, vid)) {
LOG(FATAL) << "Duplicate vertex id: " << str_view << "..";
VLOG(2) << "Duplicate vertex id: " << str_view << "..";
vids.emplace_back(std::numeric_limits<vid_t>::max());
} else {
vids.emplace_back(vid);
}
vids.emplace_back(vid);
}
} else if (col->type()->Equals(arrow::large_utf8())) {
auto casted_array =
Expand All @@ -126,9 +129,11 @@ struct _add_vertex {
auto str = casted_array->GetView(i);
std::string_view str_view(str.data(), str.size());
if (!indexer.add(str_view, vid)) {
LOG(FATAL) << "Duplicate vertex id: " << str_view << "..";
VLOG(2) << "Duplicate vertex id: " << str_view << "..";
vids.emplace_back(std::numeric_limits<vid_t>::max());
} else {
vids.emplace_back(vid);
}
vids.emplace_back(vid);
}
} else {
LOG(FATAL) << "Not support type: " << col->type()->ToString();
Expand Down Expand Up @@ -182,6 +187,7 @@ void _append(bool is_dst, size_t cur_ind, std::shared_ptr<arrow::Array> col,
const IndexerType& indexer,
std::vector<std::tuple<vid_t, vid_t, EDATA_T>>& parsed_edges,
std::vector<int32_t>& degree) {
static constexpr auto invalid_vid = std::numeric_limits<vid_t>::max();
if constexpr (std::is_same_v<PK_T, std::string_view>) {
if (col->type()->Equals(arrow::utf8())) {
auto casted = std::static_pointer_cast<arrow::StringArray>(col);
Expand All @@ -194,7 +200,9 @@ void _append(bool is_dst, size_t cur_ind, std::shared_ptr<arrow::Array> col,
} else {
std::get<0>(parsed_edges[cur_ind++]) = vid;
}
degree[vid]++;
if (vid != invalid_vid) {
degree[vid]++;
}
}
} else {
// must be large utf8
Expand All @@ -208,7 +216,9 @@ void _append(bool is_dst, size_t cur_ind, std::shared_ptr<arrow::Array> col,
} else {
std::get<0>(parsed_edges[cur_ind++]) = vid;
}
degree[vid]++;
if (vid != invalid_vid) {
degree[vid]++;
}
}
}
} else {
Expand All @@ -221,7 +231,9 @@ void _append(bool is_dst, size_t cur_ind, std::shared_ptr<arrow::Array> col,
} else {
std::get<0>(parsed_edges[cur_ind++]) = vid;
}
degree[vid]++;
if (vid != invalid_vid) {
degree[vid]++;
}
}
}
}
Expand Down

0 comments on commit 10525cf

Please sign in to comment.