Skip to content

Commit

Permalink
fix(interactive): Fix bug in scan operator (#4221)
Browse files Browse the repository at this point in the history
Add type check in scan operator, and ensure the error message can be
sent back to client.
  • Loading branch information
zhanglei1949 authored Sep 12, 2024
1 parent bb5ebbd commit eeececf
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 113 deletions.
9 changes: 6 additions & 3 deletions flex/engines/graph_db/database/graph_db_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ Result<std::vector<char>> GraphDBSession::Eval(const std::string& input) {
}

for (size_t i = 0; i < MAX_RETRY; ++i) {
result_buffer.clear();
if (app->run(*this, decoder, encoder)) {
const auto end = std::chrono::high_resolution_clock::now();
app_metrics_[type].add_record(
Expand All @@ -160,7 +161,6 @@ Result<std::vector<char>> GraphDBSession::Eval(const std::string& input) {
}

decoder.reset(sv.data(), sv.size());
result_buffer.clear();
}

const auto end = std::chrono::high_resolution_clock::now();
Expand All @@ -172,10 +172,13 @@ Result<std::vector<char>> GraphDBSession::Eval(const std::string& input) {
// output buffer.
// For example, for adhoc_app.cc, if the query failed, the error info will
// be put in the output buffer.
if (result_buffer.size() > 0) {
if (result_buffer.size() > 4) {
return Result<std::vector<char>>(
StatusCode::QUERY_FAILED,
std::string{result_buffer.data(), result_buffer.size()}, result_buffer);
std::string{result_buffer.data() + 4,
result_buffer.size() -
4}, // The first 4 bytes are the length of the message.
result_buffer);
} else {
return Result<std::vector<char>>(
StatusCode::QUERY_FAILED,
Expand Down
193 changes: 86 additions & 107 deletions flex/engines/graph_db/runtime/adhoc/operators/scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,57 +98,52 @@ static bool is_find_vertex(const physical::Scan& scan_opr,
return true;
}

bool parse_idx_predicate(const algebra::IndexPredicate& predicate,
const std::map<std::string, std::string>& params,
std::vector<int64_t>& oids, bool& scan_oid) {
// todo unsupported cases.
if (predicate.or_predicates_size() != 1) {
return false;
}
// todo unsupported cases.
if (predicate.or_predicates(0).predicates_size() != 1) {
return false;
}
const algebra::IndexPredicate_Triplet& triplet =
predicate.or_predicates(0).predicates(0);
if (!triplet.has_key()) {
return false;
}
auto key = triplet.key();
if (key.has_key()) {
scan_oid = true;
} else if (key.has_id()) {
scan_oid = false;
static bl::result<Context> scan_vertices_expr_impl(
bool scan_oid, const std::vector<Any>& input_ids,
const ReadTransaction& txn, const ScanParams& scan_params,
std::unique_ptr<ExprBase> expr) {
if (scan_oid) {
return Scan::filter_oids(
txn, scan_params,
[&expr, input_ids](label_t label, vid_t vid) {
return expr->eval_vertex(label, vid, 0).as_bool();
},
input_ids);
} else {
LOG(FATAL) << "unexpected key case";
}
if (triplet.cmp() != common::Logical::EQ && triplet.cmp() != common::WITHIN) {
return false;
std::vector<int64_t> gids;
for (size_t i = 0; i < input_ids.size(); i++) {
if (input_ids[i].type != PropertyType::Int64()) {
RETURN_BAD_REQUEST_ERROR("Expect int64 type for global id");
}
gids.push_back(input_ids[i].AsInt64());
}
return Scan::filter_gids(
txn, scan_params,
[&expr, input_ids](label_t label, vid_t vid) {
return expr->eval_vertex(label, vid, 0).as_bool();
},
gids);
}
}

if (triplet.value_case() ==
algebra::IndexPredicate_Triplet::ValueCase::kConst) {
if (triplet.const_().item_case() == common::Value::kI32) {
oids.emplace_back(triplet.const_().i32());
} else if (triplet.const_().item_case() == common::Value::kI64) {
oids.emplace_back(triplet.const_().i64());
} else if (triplet.const_().item_case() == common::Value::kI64Array) {
const auto& arr = triplet.const_().i64_array();
for (int i = 0; i < arr.item_size(); ++i) {
oids.emplace_back(arr.item(i));
static bl::result<Context> scan_vertices_no_expr_impl(
bool scan_oid, const std::vector<Any>& input_ids,
const ReadTransaction& txn, const ScanParams& scan_params) {
if (scan_oid) {
return Scan::filter_oids(
txn, scan_params, [](label_t label, vid_t vid) { return true; },
input_ids);
} else {
std::vector<int64_t> gids;
for (size_t i = 0; i < input_ids.size(); i++) {
if (input_ids[i].type != PropertyType::Int64()) {
RETURN_BAD_REQUEST_ERROR("Expect int64 type for global id");
}

} else {
return false;
gids.push_back(input_ids[i].AsInt64());
}
} else if (triplet.value_case() ==
algebra::IndexPredicate_Triplet::ValueCase::kParam) {
const common::DynamicParam& p = triplet.param();
std::string name = p.name();
std::string value = params.at(name);
oids.emplace_back(std::stoll(value));
return Scan::filter_gids(
txn, scan_params, [](label_t, vid_t) { return true; }, gids);
}
return true;
}

bool parse_idx_predicate(const algebra::IndexPredicate& predicate,
Expand Down Expand Up @@ -276,86 +271,66 @@ bl::result<Context> eval_scan(
Context ctx;
auto expr = parse_expression(
txn, ctx, params, scan_opr_params.predicate(), VarType::kVertexVar);
std::vector<int64_t> oids{};
CHECK(parse_idx_predicate(scan_opr.idx_predicate(), params, oids,
scan_oid));
if (scan_oid) {
return Scan::filter_oids(
txn, scan_params,
[&expr, oids](label_t label, vid_t vid) {
return expr->eval_vertex(label, vid, 0).as_bool();
},
oids);
} else {
return Scan::filter_gids(
txn, scan_params,
[&expr, oids](label_t label, vid_t vid) {
return expr->eval_vertex(label, vid, 0).as_bool();
},
oids);
std::vector<Any> oids{};
if (!parse_idx_predicate(scan_opr.idx_predicate(), params, oids,
scan_oid)) {
LOG(ERROR) << "parse idx predicate failed";
RETURN_UNSUPPORTED_ERROR("parse idx predicate failed");
}
std::vector<Any> valid_oids;
// In this case, we expect the type consistent with pk
for (const auto& oid : oids) {
if (oid.type == PropertyType::Int64()) {
valid_oids.push_back(oid);
} else if (oid.type == PropertyType::Int32()) {
valid_oids.push_back(Any::From<int64_t>(oid.AsInt32()));
} else {
LOG(ERROR) << "Expect int64 type for global id, but got: "
<< oid.type;
RETURN_BAD_REQUEST_ERROR(
"Expect int64 type for global id, but got: " +
oid.type.ToString());
}
}
return scan_vertices_expr_impl(scan_oid, valid_oids, txn, scan_params,
std::move(expr));
}

if (scan_opr.has_idx_predicate()) {
std::vector<int64_t> oids{};
CHECK(parse_idx_predicate(scan_opr.idx_predicate(), params, oids,
scan_oid));

if (scan_oid) {
return Scan::filter_oids(
txn, scan_params, [](label_t label, vid_t vid) { return true; },
oids);
} else {
return Scan::filter_gids(
txn, scan_params, [](label_t, vid_t) { return true; }, oids);
std::vector<Any> oids{};
if (!parse_idx_predicate(scan_opr.idx_predicate(), params, oids,
scan_oid)) {
LOG(ERROR) << "parse idx predicate failed: "
<< scan_opr.DebugString();
RETURN_UNSUPPORTED_ERROR("parse idx predicate failed");
}
return scan_vertices_no_expr_impl(scan_oid, oids, txn, scan_params);
}
} else if (scan_opr.has_idx_predicate()) {
if (scan_opr.has_idx_predicate() && scan_opr_params.has_predicate()) {
Context ctx;
auto expr = parse_expression(
txn, ctx, params, scan_opr_params.predicate(), VarType::kVertexVar);
std::vector<Any> oids{};
CHECK(parse_idx_predicate(scan_opr.idx_predicate(), params, oids,
scan_oid));
if (scan_oid) {
return Scan::filter_oids(
txn, scan_params,
[&expr, oids](label_t label, vid_t vid) {
return expr->eval_vertex(label, vid, 0).as_bool();
},
oids);
} else {
std::vector<int64_t> gids;
for (size_t i = 0; i < oids.size(); i++) {
gids.push_back(oids[i].AsInt64());
}
return Scan::filter_gids(
txn, scan_params,
[&expr, gids](label_t label, vid_t vid) {
return expr->eval_vertex(label, vid, 0).as_bool();
},
gids);
if (!parse_idx_predicate(scan_opr.idx_predicate(), params, oids,
scan_oid)) {
LOG(ERROR) << "parse idx predicate failed: "
<< scan_opr.DebugString();
RETURN_UNSUPPORTED_ERROR("parse idx predicate failed");
}
return scan_vertices_expr_impl(scan_oid, oids, txn, scan_params,
std::move(expr));
}

if (scan_opr.has_idx_predicate()) {
std::vector<Any> oids{};
CHECK(parse_idx_predicate(scan_opr.idx_predicate(), params, oids,
scan_oid));

if (scan_oid) {
return Scan::filter_oids(
txn, scan_params, [](label_t label, vid_t vid) { return true; },
oids);
} else {
std::vector<int64_t> gids;
for (size_t i = 0; i < oids.size(); i++) {
gids.push_back(oids[i].AsInt64());
}
return Scan::filter_gids(
txn, scan_params, [](label_t, vid_t) { return true; }, gids);
if (!parse_idx_predicate(scan_opr.idx_predicate(), params, oids,
scan_oid)) {
LOG(ERROR) << "parse idx predicate failed: "
<< scan_opr.DebugString();
RETURN_UNSUPPORTED_ERROR("parse idx predicate failed");
}
return scan_vertices_no_expr_impl(scan_oid, oids, txn, scan_params);
}
}

Expand All @@ -380,6 +355,10 @@ bl::result<Context> eval_scan(
return Scan::scan_vertex(txn, scan_params,
[](label_t, vid_t) { return true; });
}
} else {
LOG(ERROR) << "unsupport scan option " << scan_opr.DebugString()
<< " we only support scan vertex currently";
RETURN_UNSUPPORTED_ERROR("unsupport scan option " + scan_opr.DebugString());
}
LOG(ERROR) << "unsupport scan option " << scan_opr.DebugString()
<< " we only support scan vertex currently";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import com.alibaba.graphscope.interactive.ApiException;
import com.alibaba.graphscope.interactive.ApiResponse;
import com.alibaba.graphscope.proto.Code;
import com.alibaba.graphscope.interactive.proto.Code;

/***
* A class which wrap the result of the API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import com.alibaba.graphscope.interactive.ApiException;
import com.alibaba.graphscope.interactive.ApiResponse;
import com.alibaba.graphscope.proto.Code;
import com.alibaba.graphscope.interactive.proto.Code;

/**
* Mapping http status code to our status code, along with a message
Expand Down
2 changes: 1 addition & 1 deletion proto/error/interactive.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ syntax = "proto3";

package gs.flex.interactive;

option java_package = "com.alibaba.graphscope.proto";
option java_package = "com.alibaba.graphscope.interactive.proto";
option java_multiple_files = true;


Expand Down

0 comments on commit eeececf

Please sign in to comment.